Skip to content

Commit

Permalink
Replace json4s with circe (closes #71)
Browse files Browse the repository at this point in the history
  • Loading branch information
asoltysik committed Aug 7, 2018
1 parent 0cc8114 commit fe486bf
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 190 deletions.
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ lazy val root = project
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.scalajHttp,
Dependencies.Libraries.json4sJackson,
Dependencies.Libraries.igluCore,
Dependencies.Libraries.igluCoreJson4s,
Dependencies.Libraries.circe,
Dependencies.Libraries.igluCoreCirce,
Dependencies.Libraries.mockito,
Dependencies.Libraries.specs2,
Dependencies.Libraries.scalaCheck
Dependencies.Libraries.scalaCheck,
Dependencies.Libraries.circeOptics
)
)
13 changes: 7 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ object Dependencies {
object V {
// Scala
val scalajHttp = "2.3.0"
val json4s = "3.2.11"
val igluCore = "0.2.0"
val circe = "0.9.3"

// Java (test only)
val mockito = "1.9.5"
Expand All @@ -31,15 +31,16 @@ object Dependencies {
object Libraries {
// Scala
val scalajHttp = "org.scalaj" %% "scalaj-http" % V.scalajHttp
val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s
val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore
val igluCoreJson4s = "com.snowplowanalytics" %% "iglu-core-json4s" % V.igluCore
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val circe = "io.circe" %% "circe-parser" % V.circe

// Java (test only)
val mockito = "org.mockito" % "mockito-all" % V.mockito % "test"
val mockito = "org.mockito" % "mockito-all" % V.mockito % "test"

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"
val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % "test"
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"
val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % "test"
val circeOptics = "io.circe" %% "circe-optics" % V.circe % "test"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

import scalaj.http._
import cats.syntax.either._

import io.circe.{Json, JsonObject}
import io.circe.parser.parse
import io.circe.syntax._

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import scalaj.http._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

Expand Down Expand Up @@ -77,16 +79,18 @@ object Ec2Metadata {
*
* @return future JSON object with identity data
*/
def getInstanceIdentity: Future[JObject] = {
def getInstanceIdentity: Future[Json] = {
val instanceIdentityDocument = getContent(InstanceIdentityUri)
instanceIdentityDocument.map { (resp: String) =>
parseOpt(resp) match {
case Some(jsonObject: JObject) =>
instanceIdentityDocument.map { resp: String =>
parse(resp).toOption.flatMap(_.asObject) match {
case Some(jsonObject) =>
val prepared = prepareEc2Context(jsonObject)
if (prepared.values.keySet.isEmpty) { throw new RuntimeException("Document contains no known keys") } else {
prepared
if (prepared.isEmpty) {
throw new RuntimeException("Document contains no known keys")
} else {
prepared.asJson
}
case _ =>
case None =>
throw new RuntimeException("Document can not be parsed")
}
}
Expand All @@ -98,23 +102,22 @@ object Ec2Metadata {
* @param url full url to the endpoint (usually http://169.254.169.254/latest/meta-data/)
* @return future JSON object with metadata
*/
def getMetadata(url: String): Future[JObject] = {
def getMetadata(url: String): Future[JsonObject] = {
val key = url.split("/").last
if (!url.endsWith("/")) { // Leaf
getContent(url).map { value =>
key -> JString(value)
}
getContent(url).map(value => JsonObject(key := value))
} else { // Node
val sublinks = getContents(url)
val subnodes: Future[List[JObject]] = sublinks.flatMap { links =>
val subnodes = sublinks.flatMap { links =>
Future.sequence {
links.map { link =>
getMetadata(url + link)
}
}
}
val mergedObject = subnodes.map { _.fold(JObject(Nil))(_.merge(_)) }
mergedObject.map(key -> _)
val mergedObject =
subnodes.map(_.fold(JsonObject.empty)((obj1, obj2) => JsonObject.fromMap(obj1.toMap ++ obj2.toMap)))
mergedObject.map(obj => JsonObject(key := obj))
}
}

Expand Down Expand Up @@ -177,8 +180,6 @@ object Ec2Metadata {
* @param context JSON object with EC2 context
* @return true if object is context
*/
private def prepareEc2Context(context: JObject): JObject =
context.filterField {
case (key, _) => instanceIdentityKeys.contains(key)
}
private def prepareEc2Context(context: JsonObject): JsonObject =
context.filterKeys(key => instanceIdentityKeys.contains(key))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal

import scalaj.http._
import cats.syntax.either._

import io.circe.Json
import io.circe.parser.parse
import io.circe.syntax._

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import scalaj.http._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

Expand All @@ -31,7 +33,7 @@ import scala.util.{Failure, Success}
* Module with parsing GCE-metadata logic
* @see https://cloud.google.com/compute/docs/storing-retrieving-metadata
*
* Unlike EC2 instance document, GCE does not provide an excerpt, but instad
* Unlike EC2 instance document, GCE does not provide an excerpt, but instead
* this module collect only meaningful properties
*/
object GceMetadata {
Expand Down Expand Up @@ -75,45 +77,53 @@ object GceMetadata {
getMetadata.map(SelfDescribingData(InstanceMetadataSchema, _))

/** Construct metadata context */
def getMetadata: Future[JObject] =
getString("cpu-platform")
.zip(getString("hostname"))
.zip(getString("id"))
.zip(getString("image"))
.zip(getString("machine-type"))
.zip(getString("name"))
.zip(getJson("tags"))
.zip(getString("zone"))
.zip(getDir("attributes/"))
.map {
case ((((((((cpuPlatform, hostname), id), image), machineType), name), tags), zone), attributes) =>
("cpuPlatform", cpuPlatform) ~
("hostname", hostname) ~
("id", id) ~
("image", image) ~
("machineType", machineType) ~
("name", name) ~
("tags", tags) ~
("zone", zone) ~
("attributes", attributes)
}
def getMetadata: Future[Json] =
for {
cpuPlatform <- getString("cpu-platform")
hostname <- getString("hostname")
id <- getString("id")
image <- getString("image")
machineType <- getString("machine-type")
name <- getString("name")
tags <- getJson("tags")
zone <- getString("zone")
attributes <- getDir("attributes/")
} yield
Json.obj(
"cpuPlatform" := cpuPlatform,
"hostname" := hostname,
"id" := id,
"image" := image,
"machineType" := machineType,
"name" := name,
"tags" := tags,
"zone" := zone,
"attributes" := attributes
)

def request(path: String) =
Http(InstanceMetadataUri + path).header("Metadata-Flavor", "Google")

private def getString(path: String): Future[String] =
Future(request(path).asString.body)

private def getJson(path: String): Future[JValue] =
Future(parse(request(path).asString.body)).map {
case JObject(Nil) => JNull
case JArray(Nil) => JNull
case other => other
}

private def getDir(path: String): Future[JValue] =
Future(parse(request(path + "?recursive=true").asString.body)).map {
case JObject(Nil) => JNull
case other => other
}
private def getJson(path: String): Future[Json] =
Future.fromTry(
parse(request(path).asString.body)
.map(
json =>
json.arrayOrObject(
json,
array => if (array.isEmpty) Json.Null else json,
obj => if (obj.isEmpty) Json.Null else json
))
.toTry
)

private def getDir(path: String): Future[Json] =
Future.fromTry(
parse(request(path + "?recursive=true").asString.body)
.map(json => json.withObject(obj => if (obj.isEmpty) Json.Null else json))
.toTry
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ package com.snowplowanalytics.snowplow.scalatracker

import java.util.Base64

import org.json4s._
import org.json4s.jackson.JsonMethods._

import scala.collection.mutable.{Map => MMap}

import io.circe.Json

import emitters.TEmitter.EmitterPayload

/**
Expand Down Expand Up @@ -68,9 +67,9 @@ private[scalatracker] class Payload {
* @param typeWhenEncoded Key to use if encodeBase64 is true
* @param typeWhenNotEncoded Key to use if encodeBase64 is false
*/
def addJson(json: JValue, encodeBase64: Boolean, typeWhenEncoded: String, typeWhenNotEncoded: String): Unit = {
def addJson(json: Json, encodeBase64: Boolean, typeWhenEncoded: String, typeWhenNotEncoded: String): Unit = {

val jsonString = compact(render(json))
val jsonString = json.noSpaces

if (encodeBase64) {
add(typeWhenEncoded, new String(Base64.getEncoder.encode(jsonString.getBytes(Encoding)), Encoding))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}

import org.json4s._
import org.json4s.JsonDSL._
import io.circe.Json
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.json4s.implicits._
import com.snowplowanalytics.iglu.core.circe.implicits._

import utils.{ErrorTracking, JsonUtils}
import emitters.TEmitter
import utils.ErrorTracking

/**
* Tracker class
Expand Down Expand Up @@ -112,7 +112,7 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB
private def addContexts(payload: Payload, contexts: Seq[SelfDescribingJson]): Payload =
if (contexts.nonEmpty) {
val contextsEnvelope: SelfDescribingJson =
SelfDescribingData(ContextsSchemaKey, JArray(contexts.toList.map(_.normalize)))
SelfDescribingData(ContextsSchemaKey, Json.fromValues(contexts.toIterable.map(_.normalize)))

payload.addJson(contextsEnvelope.normalize, encodeBase64, "cx", "co")
payload
Expand Down Expand Up @@ -309,13 +309,14 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB
contexts: List[SelfDescribingJson] = Nil,
timestamp: Option[Timestamp] = None): Tracker = {

val eventJson =
("sku" -> sku) ~
("name" -> name) ~
("category" -> category) ~
("unitPrice" -> unitPrice) ~
("quantity" -> quantity) ~
("currency" -> currency)
val eventJson = JsonUtils.jsonObjectWithoutNulls(
"sku" := sku,
"name" := name,
"category" := category,
"unitPrice" := unitPrice,
"quantity" := quantity,
"currency" := currency
)

trackSelfDescribingEvent(
SelfDescribingData(
Expand Down Expand Up @@ -347,13 +348,14 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB
contexts: List[SelfDescribingJson] = Nil,
timestamp: Option[Timestamp] = None): Tracker = {

val eventJson =
("sku" -> sku) ~
("name" -> name) ~
("category" -> category) ~
("unitPrice" -> unitPrice) ~
("quantity" -> quantity) ~
("currency" -> currency)
val eventJson = JsonUtils.jsonObjectWithoutNulls(
"sku" := sku,
"name" := name,
"category" := category,
"unitPrice" := unitPrice,
"quantity" := quantity,
"currency" := currency
)

trackSelfDescribingEvent(
SelfDescribingData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ class AsyncBatchEmitter private (ec: ExecutionContext,

// Start consumer thread synchronously trying to send events to collector
val worker = new Thread {
override def run() {
override def run(): Unit =
while (true) {
val batch = queue.take()
submit(queue, ec, callback, collector, batch)
}
}
}

worker.setDaemon(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ class AsyncEmitter private (ec: ExecutionContext, collector: CollectorParams, ca
val queue = new LinkedBlockingQueue[CollectorRequest]()

val worker = new Thread {
override def run() {
override def run(): Unit =
while (true) {
val event = queue.take()
submit(queue, ec, callback, collector, event)
}
}
}

worker.setDaemon(true)
Expand Down
Loading

0 comments on commit fe486bf

Please sign in to comment.