diff --git a/tutorial/MatrixTutorial1.scala b/tutorial/MatrixTutorial1.scala new file mode 100644 index 0000000000..7d3340fa28 --- /dev/null +++ b/tutorial/MatrixTutorial1.scala @@ -0,0 +1,29 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial1.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and compute the co-follows between any two nodes +* +* ../scripts/scald.rb --local MatrixTutorial1.scala --input data/graph.tsv --output data/cofollows.tsv +* +*/ + + +class CofollowsJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // compute the innerproduct of the adjacency matrix with itself + (adjacencyMatrix * adjacencyMatrix.transpose).write( Tsv( args("output") ) ) +} + diff --git a/tutorial/MatrixTutorial2.scala b/tutorial/MatrixTutorial2.scala new file mode 100644 index 0000000000..f3328db33b --- /dev/null +++ b/tutorial/MatrixTutorial2.scala @@ -0,0 +1,40 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial2.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and returns a graph containing only the nodes with outdegree smaller than a given value +* +* ../scripts/scald.rb --local MatrixTutorial2.scala --input data/graph.tsv --maxOutdegree 1000 --output data/graphFiltered.tsv +* +*/ + + +class FilterOutdegreeJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // Each row corresponds to the outgoing edges so to compute the outdegree we sum out the columns + val outdegree = adjacencyMatrix.sumColVectors + + // We convert the column vector to a matrix object to be able to use the matrix method filterValues + // we make all non zero values into ones and then convert it back to column vector + val outdegreeFiltered = outdegree.toMatrix[Int](1) + .filterValues{ _ < args("maxOutdegree").toDouble } + .binarizeAs[Double].getCol(1) + + // We multiply on the left hand side with the diagonal matrix created from the column vector + // to keep only the rows with outdregree smaller than maxOutdegree + (outdegreeFiltered.diag * adjacencyMatrix).write(Tsv( args("output") ) ) + +} + diff --git a/tutorial/MatrixTutorial3.scala b/tutorial/MatrixTutorial3.scala new file mode 100644 index 0000000000..89be75433d --- /dev/null +++ b/tutorial/MatrixTutorial3.scala @@ -0,0 +1,39 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial3.scala +* +* Loads two directed graph adjacency matrices where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and computes the intersection and the differences between the two +* +* ../scripts/scald.rb --local MatrixTutorial3.scala --input1 data/graph.tsv --input2 data/graph2.tsv --intersection data/intersection.tsv --leftDiff data/leftDiff.tsv --rightDiff data/rightDiff.tsv +* +*/ + + +class ComputeMatrixIntersectionJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix1 = Tsv( args("input1"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + val adjacencyMatrix2 = Tsv( args("input2"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + //zip puts creates a pair element out of corresponding elements in the two matrices + val intersection = adjacencyMatrix1 + .zip(adjacencyMatrix2) + .mapValues( pair => if (pair._1 > 0 && pair._2 > 0) 1.0 else 0.0 ) + .write(Tsv(args("intersection"))) + (adjacencyMatrix1 - intersection).write(Tsv(args("leftDiff"))) + (adjacencyMatrix2 - intersection).write(Tsv(args("rightDiff"))) + +} + diff --git a/tutorial/MatrixTutorial4.scala b/tutorial/MatrixTutorial4.scala new file mode 100644 index 0000000000..8597548a89 --- /dev/null +++ b/tutorial/MatrixTutorial4.scala @@ -0,0 +1,33 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial4.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and computes the cosine of the angle between every two pairs of vectors +* +* ../scripts/scald.rb --local MatrixTutorial4.scala --input data/graph.tsv --output data/cosineSim.tsv +* +*/ + +class ComputeCosineJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // we compute the L2 normalized adjacency graph + val normMatrix = adjacencyMatrix.rowL2Normalize + + // we compute the innerproduct of the normalized matrix with itself + // which is equivalent with computing cosine: AA^T / ||A|| * ||A|| + (normMatrix * normMatrix.transpose).write( Tsv( args("output") ) ) + +} + diff --git a/tutorial/MatrixTutorial5.scala b/tutorial/MatrixTutorial5.scala new file mode 100644 index 0000000000..bccb996bfd --- /dev/null +++ b/tutorial/MatrixTutorial5.scala @@ -0,0 +1,44 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial5.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and computes the jaccard similarity between any two pairs of vectors +* +* ../scripts/scald.rb --local MatrixTutorial5.scala --input data/graph.tsv --output data/jaccardSim.tsv +* +*/ + +class ComputeJaccardJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + val aBinary = adjacencyMatrix.binarizeAs[Double] + + // intersectMat holds the size of the intersection of row(a)_i n row (b)_j + val intersectMat = aBinary * aBinary.transpose + val aSumVct = aBinary.sumColVectors + val bSumVct = aBinary.sumRowVectors + + //Using zip to repeat the row and column vectors values on the right hand + //for all non-zeroes on the left hand matrix + val xMat = intersectMat.zip(aSumVct).mapValues( pair => pair._2 ) + val yMat = intersectMat.zip(bSumVct).mapValues( pair => pair._2 ) + + val unionMat = xMat + yMat - intersectMat + //We are guaranteed to have Double both in the intersection and in the union matrix + intersectMat.zip(unionMat) + .mapValues( pair => pair._1 / pair._2 ) + .write(Tsv( args("output") )) + +} + diff --git a/tutorial/MatrixTutorial6.scala b/tutorial/MatrixTutorial6.scala new file mode 100644 index 0000000000..2e8108cebb --- /dev/null +++ b/tutorial/MatrixTutorial6.scala @@ -0,0 +1,40 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + +/* +* MatrixTutorial6.scala +* +* Loads a document to word matrix where a[i,j] = freq of the word j in the document i +* computes the Tf-Idf score of each word w.r.t. to each document and keeps the top nrWords in each document +* (see http://en.wikipedia.org/wiki/Tf*idf for more info) +* +* ../scripts/scald.rb --local MatrixTutorial6.scala --input data/docBOW.tsv --nrWords 300 --output data/featSelectedMatrix.tsv +* +*/ + +class TfIdfJob(args : Args) extends Job(args) { + + import Matrix._ + + val docWordMatrix = Tsv( args("input"), ('doc, 'word, 'count) ) + .read + .toMatrix[Long,String,Double]('doc, 'word, 'count) + + // compute the overall document frequency of each row + val docFreq = docWordMatrix.sumRowVectors + + // compute the inverse document frequency vector + val invDocFreqVct = docFreq.toMatrix(1).rowL1Normalize.mapValues( x => log2(1/x) ) + + // zip the row vector along the entire document - word matrix + val invDocFreqMat = docWordMatrix.zip(invDocFreqVct.getRow(1)).mapValues( pair => pair._2 ) + + // multiply the term frequency with the inverse document frequency and keep the top nrWords + docWordMatrix.hProd(invDocFreqMat).topRowElems( args("nrWords").toInt ).write(Tsv( args("output") )) + + def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0) + +} + diff --git a/tutorial/data/docBOW.tsv b/tutorial/data/docBOW.tsv new file mode 100644 index 0000000000..137fa13b63 --- /dev/null +++ b/tutorial/data/docBOW.tsv @@ -0,0 +1,9 @@ +1 hello 2 +1 twitter 1 +2 conversation 1 +2 celebrities 1 +2 twitter 1 +3 elections 1 +3 debate 1 +3 twitter 1 +3 political 1 diff --git a/tutorial/data/graph.tsv b/tutorial/data/graph.tsv index 1251bfddf6..3130da0514 100644 --- a/tutorial/data/graph.tsv +++ b/tutorial/data/graph.tsv @@ -1,3 +1,5 @@ 1 2 1 1 3 1 -2 3 1 +3 2 1 +4 2 2 + diff --git a/tutorial/data/graph2.tsv b/tutorial/data/graph2.tsv new file mode 100644 index 0000000000..1251bfddf6 --- /dev/null +++ b/tutorial/data/graph2.tsv @@ -0,0 +1,3 @@ +1 2 1 +1 3 1 +2 3 1 diff --git a/tutorial/data/helloDoc.txt b/tutorial/data/helloDoc.txt new file mode 100644 index 0000000000..d64ecdb119 --- /dev/null +++ b/tutorial/data/helloDoc.txt @@ -0,0 +1,3 @@ +1 Hello world +2 See ya soon world +3 Hello again world