Permalink
Browse files

Step 3: count pages with the same titles

  • Loading branch information...
albertpastrana committed Jun 18, 2015
1 parent dce5780 commit 9d0a00564b95abbf31580d0c28b7ac89b8fd16b9
Showing with 48 additions and 13 deletions.
  1. +48 −13 src/main/scala/com/intenthq/wikidata/App.scala
@@ -4,15 +4,15 @@ import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorFlowMaterializer, ActorFlowMaterializerSettings}
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods._
import scopt.OptionParser
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Future, ExecutionContext}
import scala.io.{Source => ioSource}
import scala.util.Try
import scala.util.{Success, Try, Failure}
object App {
@@ -41,15 +41,29 @@ object App {
}
def task(config: Config): Int = {
implicit val system = ActorSystem("wikidata-process")
val settings = ActorFlowMaterializerSettings(system).withInputBuffer(1, 1)
implicit val materializer = ActorFlowMaterializer(Some(settings))
import scala.concurrent.ExecutionContext.Implicits.global
source(config.input)
.via(parseJson(config.langs))
.runWith(logEveryNSink(1000))
.onComplete(x => system.shutdown())
implicit val system = ActorSystem("wikidata-poc")
implicit val materializer = ActorFlowMaterializer()
import system.dispatcher
val elements = source(config.input).via(parseJson(config.langs))
val g = FlowGraph.closed(count) { implicit b =>
sinkCount => {
import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[WikidataElement](2))
elements ~> broadcast ~> logEveryNSink(1000)
broadcast ~> checkSameTitles(config.langs.toSet) ~> sinkCount
}
}
g.run().onComplete { x =>
x match {
case Success((t, f)) => printResults(t, f)
case Failure(tr) => println("Something went wrong")
}
system.shutdown()
}
0
}
@@ -87,4 +101,25 @@ object App {
x + 1
}
def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement]
.filter(_.sites.keySet == langs)
.map { x =>
val titles = x.sites.values
titles.forall( _ == titles.head)
}
def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0,0)) {
case ((t, f), true) => (t+1, f)
case ((t, f), false) => (t, f+1)
}
def printResults(t: Int, f: Int) = {
val message = s"""
| Number of items with the same title: $t
| Number of items with the different title: $f
| Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
""".stripMargin
println(message)
}
}

0 comments on commit 9d0a005

Please sign in to comment.