Permalink
Browse files

Upgrade to Akka Streams RC4

Needed to rename `ActorFlowMaterializer` to `ActorMaterializer`.
Also, as `InputStreamSource` is now available we are using it.
  • Loading branch information...
albertpastrana committed Jun 25, 2015
1 parent 9d0a005 commit be7966fb79c0a1d6997f957e7fc0831a4b65ccc3
Showing with 11 additions and 8 deletions.
  1. +1 −1 build.sbt
  2. +10 −7 src/main/scala/com/intenthq/wikidata/App.scala
View
@@ -6,7 +6,7 @@ resolvers += "scalaz-bintray" at "http://dl.bintray.com/scalaz/releases"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC3"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC4"
scalacOptions in Test ++= Seq("-Yrangepos")
@@ -4,15 +4,17 @@ import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorMaterializer
import akka.stream.io.{Framing, InputStreamSource}
import akka.stream.scaladsl._
import akka.util.ByteString
import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods._
import scopt.OptionParser
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}
import scala.io.{Source => ioSource}
import scala.util.{Success, Try, Failure}
import scala.util.{Failure, Success, Try}
object App {
@@ -42,7 +44,7 @@ object App {
def task(config: Config): Int = {
implicit val system = ActorSystem("wikidata-poc")
implicit val materializer = ActorFlowMaterializer()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val elements = source(config.input).via(parseJson(config.langs))
@@ -67,10 +69,11 @@ object App {
0
}
def source(file: File): Source[String, Unit] = {
def source(file: File): Source[String, Future[Long]] = {
val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
val source = ioSource.fromInputStream(compressed, "utf-8")
Source(() => source.getLines()).drop(1)
InputStreamSource(() => compressed)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
.map(x => x.decodeString("utf-8"))
}
def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] =

0 comments on commit be7966f

Please sign in to comment.