Skip to content

Commit

Permalink
Scala Common Enrich: added weather enrichment (close #456)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy authored and alexanderdean committed Jan 14, 2016
1 parent 20c416b commit a0dab90
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 3 deletions.
16 changes: 16 additions & 0 deletions 3-enrich/config/enrichments/weather_enrichment_config.json
@@ -0,0 +1,16 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/weather_enrichment_config/jsonschema/1-0-0",

"data": {
"enabled": true,
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "weather_enrichment_config",
"parameters": {
"apiKey": "{{KEY}}",
"cacheSize": 5100,
"geoPrecision": 1,
"apiHost": "history.openweathermap.org",
"timeout": 5
}
}
}
2 changes: 2 additions & 0 deletions 3-enrich/scala-common-enrich/project/Dependencies.scala
Expand Up @@ -52,6 +52,7 @@ object Dependencies {
val json4s = "3.2.11"
val igluClient = "0.3.1"
val scalaForex = "0.3.0"
val scalaWeather = "0.1.0"
// Scala (test only)
val specs2 = "1.14"
val scalazSpecs2 = "0.1.2"
Expand Down Expand Up @@ -86,6 +87,7 @@ object Dependencies {
val json4sScalaz = "org.json4s" %% "json4s-scalaz" % V.json4s
val igluClient = "com.snowplowanalytics" % "iglu-scala-client" % V.igluClient
val scalaUri = "com.netaporter" %% "scala-uri" % V.scalaUri
val scalaWeather = "com.snowplowanalytics" %% "scala-weather" % V.scalaWeather
// Scala (test only)
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
val scalazSpecs2 = "org.typelevel" %% "scalaz-specs2" % V.scalazSpecs2 % "test"
Expand Down
Expand Up @@ -56,6 +56,7 @@ object SnowplowCommonEnrichBuild extends Build {
Libraries.igluClient,
Libraries.scalaUri,
Libraries.scalaForex,
Libraries.scalaWeather,
// Scala (test only)
Libraries.specs2,
Libraries.scalazSpecs2,
Expand Down
Expand Up @@ -451,9 +451,22 @@ object EnrichmentManager {
case None => Nil
}

// Fetch weather context
val weatherContext = registry.getWeatherEnrichment match {
case Some(we) => {
we.getWeatherContext(
Option(event.geo_latitude),
Option(event.geo_longitude),
Option(event.derived_tstamp).map(EventEnrichments.fromTimestamp)).map(_.some)
}
case None => None.success
}

// Assemble array of derived contexts
val derived_contexts = List(uaParser).collect {
case Success(Some(context)) => context
} ++ List(weatherContext).collect {
case Success(Some(context)) => context
} ++ jsScript.getOrElse(Nil) ++ cookieExtractorContext

if (derived_contexts.size > 0) {
Expand Down Expand Up @@ -483,8 +496,9 @@ object EnrichmentManager {
jsScript.toValidationNel |@|
campaign |@|
shred |@|
extractSchema.toValidationNel) {
(_,_,_,_,_,_,_,_,_) => ()
extractSchema.toValidationNel |@|
weatherContext.toValidationNel) {
(_,_,_,_,_,_,_,_,_,_) => ()
}
(first |@| second) {
(_,_) => event
Expand Down
Expand Up @@ -49,12 +49,14 @@ import registry.{
JavascriptScriptEnrichment,
EventFingerprintEnrichment,
CookieExtractorEnrichment,
WeatherEnrichment,
UserAgentUtilsEnrichmentConfig,
UaParserEnrichmentConfig,
CurrencyConversionEnrichmentConfig,
JavascriptScriptEnrichmentConfig,
EventFingerprintEnrichmentConfig,
CookieExtractorEnrichmentConfig
CookieExtractorEnrichmentConfig,
WeatherEnrichmentConfig
}

import utils.ScalazJson4sUtils
Expand Down Expand Up @@ -150,6 +152,8 @@ object EnrichmentRegistry {
EventFingerprintEnrichmentConfig.parse(enrichmentConfig, schemaKey).map((nm, _).some)
} else if (nm == "cookie_extractor_config") {
CookieExtractorEnrichmentConfig.parse(enrichmentConfig, schemaKey).map((nm, _).some)
} else if (nm == "weather_enrichment_config") {
WeatherEnrichmentConfig.parse(enrichmentConfig, schemaKey).map((nm, _).some)
} else {
None.success // Enrichment is not recognized yet
}
Expand Down Expand Up @@ -264,6 +268,15 @@ case class EnrichmentRegistry(private val configs: EnrichmentMap) {
def getCookieExtractorEnrichment: Option[CookieExtractorEnrichment] =
getEnrichment[CookieExtractorEnrichment]("cookie_extractor_config")

/**
* Returns an Option boxing the WeatherEnrichment
* config value if present, or None if not
*
* @return Option boxing the WeatherEnrichment instance
*/
def getWeatherEnrichment: Option[WeatherEnrichment] =
getEnrichment[WeatherEnrichment]("weather_enrichment_config")

/**
* Returns an Option boxing an Enrichment
* config value if present, or None if not
Expand Down
@@ -0,0 +1,182 @@
/*
* Copyright (c) 2012-2015 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics
package snowplow
package enrich
package common
package enrichments.registry

// Maven Artifact
import org.apache.maven.artifact.versioning.DefaultArtifactVersion

// Java
import java.lang.{ Float => JFloat }

// Scala
import scala.util.control.NonFatal

// Scalaz
import scalaz._
import Scalaz._

// Joda time
import org.joda.time.{ DateTime, DateTimeZone }

// json4s
import org.json4s.{ JValue, JObject, DefaultFormats }
import org.json4s.Extraction
import org.json4s.JsonDSL._

// Iglu
import iglu.client.SchemaKey

// Scala-Weather
import com.snowplowanalytics.weather.providers.openweather.OwmCacheClient
import com.snowplowanalytics.weather.providers.openweather.Responses._

// Iglu
import iglu.client.SchemaCriterion

// This project
import utils.ScalazJson4sUtils
import enrichments.EventEnrichments

/**
* Companion object. Lets us create an WeatherEnrichment instance from a JValue
*/
object WeatherEnrichmentConfig extends ParseableEnrichment {

val supportedSchema = SchemaCriterion("com.snowplowanalytics.snowplow.enrichments", "weather_enrichment_config", "jsonschema", 1, 0)

def parse(config: JValue, schemaKey: SchemaKey): ValidatedNelMessage[WeatherEnrichment] = {
isParseable(config, schemaKey).flatMap { conf => {
(for {
apiKey <- ScalazJson4sUtils.extract[String](config, "parameters", "apiKey")
cacheSize <- ScalazJson4sUtils.extract[Int](config, "parameters", "cacheSize")
geoPrecision <- ScalazJson4sUtils.extract[Int](config, "parameters", "geoPrecision")
apiHost <- ScalazJson4sUtils.extract[String](config, "parameters", "apiHost")
timeout <- ScalazJson4sUtils.extract[Int](config, "parameters", "timeout")
enrich = WeatherEnrichment(apiKey, cacheSize, geoPrecision, apiHost, timeout)
} yield enrich).toValidationNel
}}
}
}


/**
* Contains weather enrichments based on geo coordinates and time
*
* @param apiKey weather provider API KEY
* @param cacheSize amount of days with prefetched weather
* @param geoPrecision rounder for geo lat/long floating, which allows to use
* more spatial precise weather stamps
* @param apiHost address of weather provider's API host
* @param timeout timeout in seconds to fetch weather from server
*/
case class WeatherEnrichment(apiKey: String, cacheSize: Int, geoPrecision: Int, apiHost: String, timeout: Int) extends Enrichment {

val version = new DefaultArtifactVersion("0.1.0")

private lazy val client = OwmCacheClient(apiKey, cacheSize, geoPrecision, apiHost, timeout)

private val schemaUri = "iglu:org.openweathermap/weather/jsonschema/1-0-0"

private implicit val formats = DefaultFormats

/**
* Get weather context as JSON for specific event
* Any non-fatal error will return failure and thus whole event will be
* filtered out in future
*
* @param latitude enriched event optional latitude (probably null)
* @param longitude enriched event optional longitude (probably null)
* @param time enriched event optional time (probably null)
* @return weather stamp as self-describing JSON object
*/
// It accepts Java Float (JFloat) instead of Scala's because it will throw NullPointerException
// on conversion step if `EnrichedEvent` has nulls as geo_latitude or geo_longitude
def getWeatherContext(latitude: Option[JFloat], longitude: Option[JFloat], time: Option[DateTime]): Validation[String, JObject] =
try {
getWeather(latitude, longitude, time).map(addSchema)
} catch {
case NonFatal(exc) => exc.toString.fail
}

/**
* Get weather stamp as JSON received from OpenWeatherMap and extracted with Scala Weather
*
* @param latitude enriched event optional latitude
* @param longitude enriched event optional longitude
* @param time enriched event optional time
* @return weather stamp as JSON object
*/
private def getWeather(latitude: Option[JFloat], longitude: Option[JFloat], time: Option[DateTime]): Validation[String, JObject] =
(latitude, longitude, time) match {
case (Some(lat), Some(lon), Some(t)) =>
getCachedOrRequest(lat, lon, (t.getMillis / 1000).toInt).flatMap { weatherStamp =>
val transformedWeather = transformWeather(weatherStamp)
Extraction.decompose(transformedWeather) match {
case obj: JObject => obj.success
case _ => s"Couldn't transform weather object $transformedWeather into JSON".fail // Shouldn't ever happen
}
}
case _ => s"One of required event fields missing. latitude: $latitude, longitude: $longitude, tstamp: $time".fail
}

/**
* Return weather, convert disjunction to validation and stringify error
*
* @param latitude event latitude
* @param longitude event longitude
* @param timestamp event timestamp
* @return optional weather stamp
*/
private def getCachedOrRequest(latitude: Float, longitude: Float, timestamp: Int): Validation[String, Weather] =
client.getCachedOrRequest(latitude, longitude, timestamp).validation.fold(_.toString.failure, _.success)

/**
* Add Iglu URI to JSON Object
*
* @param context weather context as JSON Object
* @return JSON Object wrapped as Self-describing JSON
*/
private def addSchema(context: JObject): JObject =
("schema", schemaUri) ~ ("data", context)

/**
* Apply all necessary transformations (currently only dt(epoch -> db timestamp)
* from `weather.providers.openweather.Responses.Weather` to `TransformedWeather`
* for further JSON decomposition
*
* @param origin original OpenWeatherMap Weather stamp
* @return tranfsormed weather
*/
private[enrichments] def transformWeather(origin: Weather): TransformedWeather = {
val time = new DateTime(origin.dt.toLong * 1000, DateTimeZone.UTC).toString
TransformedWeather(origin.main, origin.wind, origin.clouds, origin.rain, origin.snow, origin.weather, time)
}
}

/**
* Copy of `com.snowplowanalytics.weather.providers.openweather.Responses.Weather` intended to
* execute typesafe (as opposed to JSON) transformation
*/
private[enrichments] case class TransformedWeather(
main: MainInfo,
wind: Wind,
clouds: Clouds,
rain: Option[Rain],
snow: Option[Snow],
weather: List[WeatherCondition],
dt: String)

0 comments on commit a0dab90

Please sign in to comment.