From 382bab10cab78a604c3da8e8691fb2f2abc66ef6 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Fri, 9 Jan 2015 18:11:05 -0600 Subject: [PATCH] Initial sketch of using Algebird with spark --- .../com/twitter/algebird/spark/package.scala | 46 +++++++++++++++++++ project/Build.scala | 4 ++ 2 files changed, 50 insertions(+) create mode 100644 algebird-spark/src/main/scala/com/twitter/algebird/spark/package.scala diff --git a/algebird-spark/src/main/scala/com/twitter/algebird/spark/package.scala b/algebird-spark/src/main/scala/com/twitter/algebird/spark/package.scala new file mode 100644 index 000000000..197759626 --- /dev/null +++ b/algebird-spark/src/main/scala/com/twitter/algebird/spark/package.scala @@ -0,0 +1,46 @@ +package com.twitter.algebird + +import org.apache.spark.rdd.{ RDD, PairRDDFunctions } + +import scala.reflect.ClassTag +import com.twitter.algebird._ + +/** + * To use this, you probably want: + * import com.twitter.algebird.spark._ + */ +package object spark { + /** + * This adds methods to Spark RDDs to use Algebird + */ + implicit class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal { + /** + * Apply an Aggregator to return a single value for the whole RDD + */ + def aggregatorOnAll[B: ClassTag, C](agg: Aggregator[T, B, C]): C = + agg.present(rdd.map(agg.prepare).reduce(agg.reduce)) + + def aggregatorByKey[K: ClassTag, A: ClassTag, B: ClassTag, C](agg: Aggregator[A, B, C])(implicit ev: T <:< (K, A), ordK: Ordering[K] = null): RDD[(K, C)] = + /** + * We use the cast to avoid having to serialize the ev, which could also be applied + */ + (new PairRDDFunctions(rdd.map { t => + val tupl = t.asInstanceOf[(K, A)] + (tupl._1, agg.prepare(tupl._2)) + })).reduceByKey(agg.reduce) + .map { case (k, b) => (k, agg.present(b)) } + + private def keyed[K, V](implicit ev: T <:< (K, V)): RDD[(K, V)] = rdd.asInstanceOf[RDD[(K, V)]] + + /** + * Use the implicit semigroup to sum by keys + */ + def semigroupByKey[K: ClassTag, V: ClassTag: Semigroup](implicit ev: T <:< (K, V), ord: Ordering[K] = null): RDD[(K, V)] = { + (new PairRDDFunctions(keyed)).reduceByKey(implicitly[Semigroup[V]].plus) + } + /** + * Use the implicit semigroup to sum all items + */ + def semigroupOnAll(implicit sg: Semigroup[T]): T = rdd.reduce(sg.plus) + } +} diff --git a/project/Build.scala b/project/Build.scala index c7d827642..1efcc00aa 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -160,6 +160,10 @@ object AlgebirdBuild extends Build { lazy val algebirdBijection = module("bijection").settings( libraryDependencies += "com.twitter" %% "bijection-core" % "0.7.0" ).dependsOn(algebirdCore, algebirdTest % "test->compile") + + lazy val algebirdSpark = module("spark").settings( + libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided" + ).dependsOn(algebirdCore, algebirdTest % "test->compile") }