Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

95 lines (82 sloc) 3.974 kb
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird._
import spark.streaming.StreamingContext._
import spark.SparkContext._
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
* <br>
* <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
* the example operates on Long IDs. Once the implementation supports other inputs (such as String),
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
* percentage of the overall total count.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
*/
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
// K highest frequency elements to take
val TOPK = 10
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
approxTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
(id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
globalCMS ++= partial
val globalTopK = globalCMS.heavyHitters.map(id =>
(id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
partialTopK.mkString("[", ",", "]")))
println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
globalTopK.mkString("[", ",", "]")))
}
})
exactTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
{case (id, count) => (count, id)})
.sortByKey(ascending = false).take(TOPK)
globalExact = mm.plus(globalExact.toMap, partialMap)
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
}
})
ssc.start()
}
}
Jump to Line
Something went wrong with that request. Please try again.