Skip to content
This repository has been archived by the owner on Apr 27, 2018. It is now read-only.

Commit

Permalink
Merge pull request #241 from yb1/checksum
Browse files Browse the repository at this point in the history
Changed output type to rdd
  • Loading branch information
ianmilligan1 committed Jul 31, 2016
2 parents ab72ae4 + 7106961 commit fe30633
Showing 1 changed file with 3 additions and 2 deletions.
@@ -1,5 +1,6 @@
package org.warcbase.spark.matchbox

import org.apache.spark.SparkContext
import org.warcbase.spark.rdd.RecordRDD._
import org.apache.spark.rdd.RDD
import org.warcbase.spark.archive.io.ArchiveRecord
Expand All @@ -12,7 +13,7 @@ import org.warcbase.spark.archive.io.ArchiveRecord
* timeoutVal: time allowed to connect to each image
*/
object ExtractPopularImages {
def apply(records: RDD[ArchiveRecord], limit: Int, minWidth: Int = 30, minHeight: Int = 30) = {
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = 30, minHeight: Int = 30) = {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getImageBytes), 1))
Expand All @@ -21,6 +22,6 @@ object ExtractPopularImages {
.reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3))
.takeOrdered(limit)(Ordering[Int].on(x => -x._2._3))
res.foreach(x => println(x._2._2 + "\t" + x._2._3))
res
sc.parallelize(res.map(x=>x._2._2 + "\t" + x._2._3), 1)
}
}

0 comments on commit fe30633

Please sign in to comment.