Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

[Streaming] Examples using Twitter's Algebird library #480

Merged
merged 7 commits into from Feb 22, 2013

Conversation

Projects
None yet
4 participants
Contributor

MLnick commented Feb 19, 2013

This PR adds two examples for streaming that use monoids from Twitter's Algebird library:

  • HyperLogLog for approximate distinct object counting with low memory overhead
  • CountMinSketch for approximating object frequency in a stream as well as TopK or "heavy hitters" estimation

See https://groups.google.com/forum/?fromgroups=#!topic/spark-users/4ht9ndVaZQY

@MLnick MLnick and 1 other commented on an outdated diff Feb 19, 2013

...ala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+ }
+
+ val DELTA = 1E-3
+ val EPS = 0.01
+ val SEED = 1
+ val PERC = 0.001
+ val TOPK = 10
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+ val stream = ssc.twitterStream(username, password, filters,
+ StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
@MLnick

MLnick Feb 19, 2013

Contributor

A note about this: currently Algebird CMS only supports Long inputs. Since it uses hashing under the hood it should be possible to have any hashable input as with HyperLogLog, but not currently.

So for now this example works on user ids, so running it over relatively small durations will not result in very heavily-skewed data (which is where the sketch will be most useful). If we could take String inputs then it would be more interesting as we could do TopK on hashtags (for example) which is likely to be a lot more skewed.

@tdas

tdas Feb 19, 2013

Contributor

This maybe an important point that may confused people. Can you added a line to the comment at the top?

@tdas tdas commented on the diff Feb 19, 2013

streaming/pom.xml
@@ -47,6 +47,16 @@
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
@tdas

tdas Feb 19, 2013

Contributor

Aah, twitter4j was missing in the streaming pom.xml! Thanks for fixing this!

@tdas tdas and 1 other commented on an outdated diff Feb 19, 2013

pyspark
@@ -36,4 +36,9 @@ if [[ "$SPARK_LAUNCH_WITH_SCALA" != "0" ]] ; then
export SPARK_LAUNCH_WITH_SCALA=1
fi
-exec "$PYSPARK_PYTHON" "$@"
+if [[ "$IPYTHON" = "1" ]] ; then
+ export PYSPARK_PYTHON="ipython"
@tdas

tdas Feb 19, 2013

Contributor

Why is this change necessary for the example?

@MLnick

MLnick Feb 19, 2013

Contributor

hmm, it is pulling in a commit from another PR: #454

I guess somehow this was not merged master <-> streaming branch.

@tdas

tdas Feb 19, 2013

Contributor

Aah, I can try to fix that by merging the master branch again to streaming.

@tdas

tdas Feb 20, 2013

Contributor

I merged all the latest stuff from master. But still this PR diff shows this change. This is weird. I checked the pyspark file from your branch https://github.com/MLnick/spark/blob/streaming-eg-algebird/pyspark and the mesos spark branch https://github.com/mesos/spark/blob/streaming/pyspark . Both seem to have the commit. Not sure why this is still showing up. Can you update your branch with all the changes of mesos/spark/streaming branch, just to be sure.

@tdas tdas commented on an outdated diff Feb 19, 2013

...ala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -0,0 +1,78 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird._
+import spark.streaming.StreamingContext._
+import spark.SparkContext._
+
+/**
+ * Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream
@tdas

tdas Feb 19, 2013

Contributor

Can you please add a bit more verbose comment about what this program does? That is, what is CountMinSketch, what it does, and what output to expect. This would help those who are new to Algebird library.

@tdas tdas and 1 other commented on an outdated diff Feb 19, 2013

...ala/spark/streaming/examples/TwitterAlgebirdHLL.scala
+
+ /** Bit size parameter for HyperLogLog */
+ val BIT_SIZE = 12
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+ val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero
+ var userSet: Set[Long] = Set()
+
+ val approxUsers = users.mapPartitions(ids => {
+ val hll = new HyperLogLogMonoid(BIT_SIZE)
@tdas

tdas Feb 19, 2013

Contributor

Few words of comments explaining the steps of how partial HLLs are used to update global HLLs would be nice! For those who are not familiar with HLLs.

@johnynek

johnynek Feb 20, 2013

you can reuse these monoids. They are immutable. So, if it is clearer at all, you can move hll above, and use the zero from that on in the globalHll also.

@johnynek johnynek commented on an outdated diff Feb 20, 2013

...ala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+ val stream = ssc.twitterStream(username, password, filters,
+ StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero
+ var globalExact = Map[Long, Int]()
+ val mm = new MapMonoid[Long, Int]()
+
+ val approxTopUsers = users.mapPartitions(ids => {
+ val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
@johnynek

johnynek Feb 20, 2013

you should be able to move this above mpaPartitions, I think. The cms doesn't have any internal mutable state.

Glad to see this pull req. Hope this helps CMS and HLL make more impacts.

I agree that the CMS interface is suboptimal now. We are going to update it to support the same approach as HLL (probably in algebird 0.2.0). Let us know if there are any algorithms to add. I'd love to collaborate and share this code in Algebird (which we extracted from scalding).

@sritchie sritchie commented on an outdated diff Feb 20, 2013

examples/pom.xml
@@ -20,11 +20,10 @@
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>3.0.3</version>
+ <groupId>com.twitter</groupId>
+ <artifactId>algebird-core_2.9.2</artifactId>
+ <version>0.1.8</version>
Contributor

MLnick commented Feb 21, 2013

@johnynek thanks for the comments! Look forward to 0.2.0 in that case since CMS with any hashable inputs will be neat. Also if I find some time I'd be happy to try a scalding version of the example.

tdas added a commit that referenced this pull request Feb 22, 2013

Merge pull request #480 from MLnick/streaming-eg-algebird
[Streaming] Examples using Twitter's Algebird library

@tdas tdas merged commit cfa65eb into mesos:streaming Feb 22, 2013

Contributor

tdas commented Feb 22, 2013

Thank you very much. This is a great addition.

sarahgerweck pushed a commit to AtScaleInc/spark2 that referenced this pull request Jan 22, 2014

Merge pull request #480 from pwendell/0.9-fixes
Handful of 0.9 fixes

This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate.

@mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in #477 and follow up discussion with him.

pwendell added a commit to andyk/mesos-spark that referenced this pull request May 5, 2014

Improved build configuration
1, Fix SPARK-1441: compile spark core error with hadoop 0.23.x
2, Fix SPARK-1491: maven hadoop-provided profile fails to build
3, Fix org.scala-lang: * ,org.apache.avro:* inconsistent versions dependency
4, A modified on the sql/catalyst/pom.xml,sql/hive/pom.xml,sql/core/pom.xml (Four spaces formatted into two spaces)

Author: witgo <witgo@qq.com>

Closes #480 from witgo/format_pom and squashes the following commits:

03f652f [witgo] review commit
b452680 [witgo] Merge branch 'master' of https://github.com/apache/spark into format_pom
bee920d [witgo] revert fix SPARK-1629: Spark Core missing commons-lang dependence
7382a07 [witgo] Merge branch 'master' of https://github.com/apache/spark into format_pom
6902c91 [witgo] fix SPARK-1629: Spark Core missing commons-lang dependence
0da4bc3 [witgo] merge master
d1718ed [witgo] Merge branch 'master' of https://github.com/apache/spark into format_pom
e345919 [witgo] add avro dependency to yarn-alpha
77fad08 [witgo] Merge branch 'master' of https://github.com/apache/spark into format_pom
62d0862 [witgo] Fix org.scala-lang: * inconsistent versions dependency
1a162d7 [witgo] Merge branch 'master' of https://github.com/apache/spark into format_pom
934f24d [witgo] review commit
cf46edc [witgo] exclude jruby
06e7328 [witgo] Merge branch 'SparkBuild' into format_pom
99464d2 [witgo] fix maven hadoop-provided profile fails to build
0c6c1fc [witgo] Fix compile spark core error with hadoop 0.23.x
6851bec [witgo] Maintain consistent SparkBuild.scala, pom.xml
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment