Permalink
Browse files

Step 1: log every N items

  • Loading branch information...
albertpastrana committed Jun 18, 2015
0 parents commit d0e32be48427962d5d8f19c7b3db0d43c868ba73
Showing with 164 additions and 0 deletions.
  1. +64 −0 .gitignore
  2. +13 −0 build.sbt
  3. +1 −0 project/plugins.sbt
  4. +86 −0 src/main/scala/com/intenthq/wikidata/App.scala
@@ -0,0 +1,64 @@
# Created by https://www.gitignore.io
### Scala ###
*.class
*.log
# sbt specific
.cache
.history
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
### OSX ###
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### Intellij ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
*.iml
## Directory-based project format:
.idea/
## File-based project format:
*.ipr
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
@@ -0,0 +1,13 @@
name := "wikidata-akka-streams"
scalaVersion := "2.11.6"
resolvers += "scalaz-bintray" at "http://dl.bintray.com/scalaz/releases"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC3"
scalacOptions in Test ++= Seq("-Yrangepos")
mainClass in Compile := Some("com.intenthq.wikidata.App")
@@ -0,0 +1 @@
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")
@@ -0,0 +1,86 @@
package com.intenthq.wikidata
import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods._
import scopt.OptionParser
import scala.io.{Source => ioSource}
import scala.util.Try
object App {
case class WikidataElement(id: String, sites: Map[String, String])
case class Config(input: File = null, langs: Seq[String] = Seq.empty)
def main(args: Array[String]) {
sys.exit(execute(args, task))
}
def execute(args: Array[String], task: (Config => Int)): Int = {
val parser = new OptionParser[Config]("wikidata") {
opt[File]('i', "input") action { (i, c) =>
c.copy(input = i) } required() text "Wikidata JSON dump"
opt[Seq[String]]('l', "languages") action { (l, c) =>
c.copy(langs = l) } required() valueName "<language1>,<language2>,..." text "Languages to take into account"
}
parser.parse(args, Config()) match {
case Some(config) =>
task(config)
case None => 1
}
}
def task(config: Config): Int = {
implicit val system = ActorSystem("wikidata-poc")
implicit val materializer = ActorFlowMaterializer()
import system.dispatcher
source(config.input)
.via(parseJson(config.langs))
.runWith(logEveryNSink(1000))
.onComplete(x => system.shutdown())
0
}
def source(file: File): Source[String, Unit] = {
val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
val source = ioSource.fromInputStream(compressed, "utf-8")
Source(() => source.getLines()).drop(1)
}
def parseJson(langs: Seq[String]): Flow[String, WikidataElement, Unit] =
Flow[String].mapConcat(line => parseItem(langs, line).toList)
def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
Try(parse(line)).toOption.flatMap { json =>
json \ "id" match {
case JString(itemId) =>
val sites = for {
lang <- langs
JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
} yield lang -> title
if (sites.isEmpty) None
else Some(WikidataElement(id = itemId, sites = sites.toMap))
case _ => None
}
}
}
def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) =>
if (x % n == 0)
println(s"Processing element $x: $y")
x + 1
}
}

0 comments on commit d0e32be

Please sign in to comment.