Permalink
Browse files

Step 2: Parallelise all the things

  • Loading branch information...
albertpastrana committed Jun 18, 2015
1 parent d0e32be commit dce57805d745e009aa2283aaf1437ad4aa6dbbc3
Showing with 10 additions and 6 deletions.
  1. +10 −6 src/main/scala/com/intenthq/wikidata/App.scala
@@ -4,12 +4,13 @@ import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorFlowMaterializer, ActorFlowMaterializerSettings}
import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods._
import scopt.OptionParser
import scala.concurrent.{ExecutionContext, Future}
import scala.io.{Source => ioSource}
import scala.util.Try
@@ -40,10 +41,11 @@ object App {
}
def task(config: Config): Int = {
implicit val system = ActorSystem("wikidata-poc")
implicit val materializer = ActorFlowMaterializer()
import system.dispatcher
implicit val system = ActorSystem("wikidata-process")
val settings = ActorFlowMaterializerSettings(system).withInputBuffer(1, 1)
implicit val materializer = ActorFlowMaterializer(Some(settings))
import scala.concurrent.ExecutionContext.Implicits.global
source(config.input)
.via(parseJson(config.langs))
.runWith(logEveryNSink(1000))
@@ -57,8 +59,10 @@ object App {
Source(() => source.getLines()).drop(1)
}
def parseJson(langs: Seq[String]): Flow[String, WikidataElement, Unit] =
Flow[String].mapConcat(line => parseItem(langs, line).toList)
def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] =
Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect {
case Some(v) => v
}
def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
Try(parse(line)).toOption.flatMap { json =>

0 comments on commit dce5780

Please sign in to comment.