Skip to content

Commit

Permalink
Cache Field (close #316)
Browse files Browse the repository at this point in the history
Thanks Dilyan for introducing an in-memory registry in CachedFieldSpec

Co-authored-by: Dilyan Damyanov <dilyand@gmail.com>
  • Loading branch information
lmath and dilyand committed Nov 29, 2022
1 parent a238409 commit 3ed5238
Show file tree
Hide file tree
Showing 16 changed files with 378 additions and 102 deletions.
3 changes: 3 additions & 0 deletions .gitignore
@@ -1 +1,4 @@
target
.idea/
.bsp/

Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.storage.bigquery.benchmark

import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.implicits.idClock
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.clocks.idClock

import cats.Id
import org.openjdk.jmh.annotations._
Expand All @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit
class FromEventBenchmark {
@Benchmark
def fromEvent(state: States.ExampleEventState): Unit = {
LoaderRow.fromEvent[Id](state.resolver, state.processor)(state.baseEvent)
LoaderRow.fromEvent[Id](state.resolver, state.processor, state.fieldCache)(state.baseEvent)
()
}
}
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.storage.bigquery.benchmark

import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.implicits.idClock
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.clocks.idClock

import org.openjdk.jmh.annotations._

Expand All @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
class GroupContextsBenchmark {
@Benchmark
def groupContexts(state: States.ExampleEventState): Unit = {
LoaderRow.groupContexts(state.resolver, state.contexts)
LoaderRow.groupContexts(state.resolver, state.fieldCache, state.contexts)
()
}
}
Expand Up @@ -24,7 +24,8 @@ object States {
val unstruct = baseEvent.copy(unstruct_event = SpecHelpers.events.adClickUnstructEvent)
val contexts = SpecHelpers.events.geoContexts
val resolver = SpecHelpers.iglu.resolver
val fieldCache = SpecHelpers.cache.fieldCache
val processor = SpecHelpers.meta.processor
val idClock = SpecHelpers.implicits.idClock
val idClock = SpecHelpers.clocks.idClock
}
}
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.storage.bigquery.benchmark

import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.implicits.idClock
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.clocks.idClock

import cats.Id
import org.openjdk.jmh.annotations._
Expand All @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit
class TransformJsonBenchmark {
@Benchmark
def transformJson(state: States.ExampleEventState): Unit = {
LoaderRow.transformJson[Id](state.resolver, state.adClickSchemaKey)(state.unstruct.unstruct_event.data.get.data)
LoaderRow.transformJson[Id](state.resolver, state.fieldCache, state.adClickSchemaKey)(state.unstruct.unstruct_event.data.get.data)
()
}
}
Expand Up @@ -14,19 +14,21 @@ package com.snowplowanalytics.snowplow.storage.bigquery.common

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaKey}
import com.snowplowanalytics.iglu.schemaddl.bigquery._
import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Schema => DdlSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._
import com.snowplowanalytics.snowplow.analytics.scalasdk.Data._
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}

import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Failure, Payload, FailureDetails}
import cats.Monad
import cats.data.{NonEmptyList, Validated, ValidatedNel}
import cats.data.{EitherT, Validated, ValidatedNel, NonEmptyList}
import cats.effect.Clock
import cats.implicits._
import com.google.api.services.bigquery.model.TableRow
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverResult
import com.snowplowanalytics.iglu.client.resolver.StorageTime
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError
import io.circe.{Encoder, Json}
import io.circe.generic.semiauto._
import io.circe.syntax._
Expand All @@ -47,37 +49,39 @@ object LoaderRow {
* it will return a BadRow with a detailed error message.
* If this preliminary check was passed, but the row cannot be loaded for other reasons,
* it will be forwarded to a "failed inserts" topic, without additional information.
* @param igluClient A Resolver to be used for schema lookups.
* @param resolver A Resolver to be used for schema lookups.
* @param record An enriched TSV line.
* @return Either a BadRow with error messages or a row that is ready to be loaded.
*/
def parse[F[_]: Monad: RegistryLookup: Clock](igluClient: Resolver[F], processor: Processor)(
def parse[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], processor: Processor, fieldCache: FieldCache[F])(
record: String
): F[Either[BadRow, LoaderRow]] =
Event.parse(record) match {
case Validated.Valid(event) =>
fromEvent[F](igluClient, processor)(event)
fromEvent[F](resolver, processor, fieldCache)(event)
case Validated.Invalid(error) =>
val badRowError = BadRow.LoaderParsingError(processor, error, Payload.RawPayload(record))
Monad[F].pure(badRowError.asLeft)
}

/** Parse JSON object provided by Snowplow Analytics SDK */
def fromEvent[F[_]: Monad: RegistryLookup: Clock](igluClient: Resolver[F], processor: Processor)(
event: Event
): F[Either[BadRow, LoaderRow]] = {
def fromEvent[F[_]: Monad: RegistryLookup: Clock](
resolver: Resolver[F],
processor: Processor,
fieldCache: FieldCache[F]
)(event: Event): F[Either[BadRow, LoaderRow]] = {
val atomic = transformAtomic(event)
val contexts: F[Transformed] = groupContexts[F](igluClient, event.contexts.data.toVector)
val contexts: F[Transformed] = groupContexts[F](resolver, fieldCache, event.contexts.data.toVector)
val derivedContexts: F[Transformed] =
groupContexts(igluClient, event.derived_contexts.data.toVector)
groupContexts(resolver, fieldCache, event.derived_contexts.data.toVector)

val selfDescribingEvent: F[Transformed] = event
.unstruct_event
.data
.map {
case SelfDescribingData(schema, data) =>
val columnName = Schema.getColumnName(ShreddedType(UnstructEvent, schema))
transformJson[F](igluClient, schema)(data).map(_.map { row =>
transformJson[F](resolver, fieldCache, schema)(data).map(_.map { row =>
List((columnName, Adapter.adaptRow(row)))
})
}
Expand Down Expand Up @@ -135,14 +139,16 @@ object LoaderRow {

/** Group list of contexts by their full URI and transform values into ready to load rows */
def groupContexts[F[_]: Monad: RegistryLookup: Clock](
igluClient: Resolver[F],
resolver: Resolver[F],
fieldCache: FieldCache[F],
contexts: Vector[SelfDescribingData[Json]]
): F[ValidatedNel[FailureDetails.LoaderIgluError, List[(String, AnyRef)]]] = {
val grouped = contexts.groupBy(_.schema).map {
case (key, groupedContexts) =>
val contexts = groupedContexts.map(_.data) // Strip away URI
val columnName = Schema.getColumnName(ShreddedType(Contexts(CustomContexts), key))
val getRow = transformJson[F](igluClient, key)(_)
val contexts: Seq[Json] = groupedContexts.map(_.data) // Strip away URI
val columnName = Schema.getColumnName(ShreddedType(Contexts(CustomContexts), key))
val getRow = transformJson[F](resolver, fieldCache, key)(_)

contexts
.toList
.map(getRow)
Expand All @@ -156,19 +162,49 @@ object LoaderRow {
* Get BigQuery-compatible table rows from data-only JSON payload
* Can be transformed to contexts (via Repeated) later only remain ue-compatible
*/
def transformJson[F[_]: Monad: RegistryLookup: Clock](igluClient: Resolver[F], schemaKey: SchemaKey)(
def transformJson[F[_]: Monad: RegistryLookup: Clock](
resolver: Resolver[F],
fieldCache: FieldCache[F],
schemaKey: SchemaKey
)(
data: Json
): F[ValidatedNel[FailureDetails.LoaderIgluError, Row]] =
igluClient
.lookupSchema(schemaKey)
.map(
_.leftMap { e =>
NonEmptyList.one(FailureDetails.LoaderIgluError.IgluError(schemaKey, e))
}.flatMap(schema => DdlSchema.parse(schema).toRight(invalidSchema(schemaKey)))
.map(schema => Field.build("", schema, false))
.flatMap(field => Row.cast(field)(data).leftMap(e => e.map(castError(schemaKey))).toEither)
.toValidated
)
): F[Validated[NonEmptyList[LoaderIgluError], Row]] =
getSchemaAsField(resolver, schemaKey, fieldCache)
.value
.map(_.flatMap(field => Row.cast(field)(data).leftMap(e => e.map(castError(schemaKey))).toEither).toValidated)

private def schemaToField(schema: Json, schemaKey: SchemaKey): Either[NonEmptyList[LoaderIgluError], Field] =
DdlSchema.parse(schema).toRight(invalidSchema(schemaKey)).map(schema => Field.build("", schema, false))

private def lookupInFieldCache[F[_]: Monad](
fieldCache: FieldCache[F],
resolvedSchema: ResolverResult.Cached[SchemaKey, Json]
): EitherT[F, NonEmptyList[LoaderIgluError], Field] = {
val fieldKey: (SchemaKey, StorageTime) = (resolvedSchema.key, resolvedSchema.timestamp)

EitherT.liftF(fieldCache.get(fieldKey)).flatMap {
case Some(field) => EitherT.pure[F, NonEmptyList[LoaderIgluError]](field)
case None => {
val field = schemaToField(resolvedSchema.value, resolvedSchema.key)
field.toEitherT[F].semiflatTap(field => fieldCache.put(fieldKey, field))
}
}
}

private[common] def getSchemaAsField[F[_]: Monad: RegistryLookup: Clock](
resolver: Resolver[F],
schemaKey: SchemaKey,
fieldCache: FieldCache[F]
): EitherT[F, NonEmptyList[LoaderIgluError], Field] =
EitherT(resolver.lookupSchemaResult(schemaKey))
.leftMap(resolutionError => NonEmptyList.one(FailureDetails.LoaderIgluError.IgluError(schemaKey = schemaKey, resolutionError)))
.flatMap {
case cached: ResolverResult.Cached[SchemaKey, Json] =>
lookupInFieldCache(fieldCache, cached)
case ResolverResult.NotCached(result) => EitherT(
Monad[F].pure(schemaToField(result, schemaKey))
)
}

def castError(schemaKey: SchemaKey)(error: CastError): FailureDetails.LoaderIgluError =
error match {
Expand Down
@@ -0,0 +1,11 @@
package com.snowplowanalytics.snowplow.storage.bigquery

import com.snowplowanalytics.iglu.client.resolver.StorageTime
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.bigquery.Field
import com.snowplowanalytics.lrumap.LruMap

package object common {
type FieldKey = (SchemaKey, StorageTime)
type FieldCache[F[_]] = LruMap[F, FieldKey, Field]
}
@@ -0,0 +1,120 @@
package com.snowplowanalytics.snowplow.storage.bigquery.common

import cats.data.{EitherT, NonEmptyList}
import org.specs2.mutable.Specification
import cats.{Id, Monad}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.bigquery.Field
import com.snowplowanalytics.iglu.schemaddl.bigquery.Mode.Nullable
import com.snowplowanalytics.iglu.schemaddl.bigquery.Type.Record
import com.snowplowanalytics.iglu.schemaddl.bigquery.Type.String
import com.snowplowanalytics.iglu.schemaddl.bigquery.Type.Integer
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow.getSchemaAsField
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.iglu._
import com.snowplowanalytics.snowplow.storage.bigquery.common.SpecHelpers.clocks.stoppedTimeClock

import scala.concurrent.duration._

class CachedFieldSpec extends Specification {
val fieldForOriginalSchema = Field("", Record(List(Field("field1", String, Nullable))), Nullable)
val fieldForPatchedSchema =
Field("", Record(List(Field("field1", String, Nullable), Field("field2", Integer, Nullable))), Nullable)

val schemaKey = SchemaKey
.fromUri("iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0")
.getOrElse(throw new Exception("invalid schema key in cached field spec"))

private def getCache: FieldCache[Id] = CreateLruMap[Id, FieldKey, Field].create(100)

private def getOriginalResolver: Resolver[Id] = resolver(staticRegistry(cachedSchemas(`original schema - 1 field`)))

private def getPatchedResolver(originalResolver: Resolver[Id]): Resolver[Id] = {
val newRepos = List(staticRegistry(cachedSchemas(`patched schema - 2 fields`)))
originalResolver.copy(repos = newRepos)
}

private def getFieldWithTime(resolver: Resolver[Id], fieldCache: FieldCache[Id], t: Int) =
getSchemaAsField(resolver, schemaKey, fieldCache)(Monad[Id], RegistryLookup[Id], stoppedTimeClock(t.toLong))

private def getResult(in: EitherT[Id, NonEmptyList[LoaderIgluError], Field]) =
in.value.getOrElse(throw new Exception("we expected a field to be created but something went wrong"))

"Cached fields should be in sync with cached schemas/lists in iglu client" >> {

"(1) original schema only, 1 field cached" in {
val fieldCache = getCache
val resolver = getOriginalResolver

val result = getResult(getFieldWithTime(resolver, fieldCache, 1000))

result must beEqualTo(fieldForOriginalSchema)

//Field is cached after first call (1 second)
fieldCache.get((schemaKey, 1.second)) must beSome
}

"(2) original schema is patched between calls, no delay => original schema is still cached => 1 field in cache" in {
val fieldCache = getCache
val originalResolver = getOriginalResolver
val patchedResolver = getPatchedResolver(originalResolver)

//first call
getFieldWithTime(originalResolver, fieldCache, 1000)

//second call, same time
val result = getResult(getFieldWithTime(patchedResolver, fieldCache, 1000))

//no data from patched schema
result must beEqualTo(fieldForOriginalSchema)

//Field is cached after first call (1 second)
fieldCache.get((schemaKey, 1.second)) must beSome
}

"(3) schema is patched, delay between getSchemaAsField calls is less than cache TTL => original schema is still cached => 1 field cached" in {
val fieldCache = getCache
val originalResolver = getOriginalResolver
val patchedResolver = getPatchedResolver(originalResolver)

//first call
getFieldWithTime(originalResolver, fieldCache, 1000)

//second call, 2s later, less than 10s TTL
val result = getResult(getFieldWithTime(patchedResolver, fieldCache, 3000))

//no data from patched schema
result must beEqualTo(fieldForOriginalSchema)

//Field cached after first call (1 second)
fieldCache.get((schemaKey, 1.second)) must beSome

//Field is not cached after second call (3 seconds)
fieldCache.get((schemaKey, 3.second)) must beNone
}

"(4) schema is patched, delay between getSchemaAsField calls is greater than cache TTL => original schema is expired => using patched schema => 2 fields cached" in {
val fieldCache = getCache
val originalResolver = getOriginalResolver
val patchedResolver = getPatchedResolver(originalResolver)

//first call
getFieldWithTime(originalResolver, fieldCache, 1000)

//second call, 12s later - greater than 10s TTL
val result = getResult(getFieldWithTime(patchedResolver, fieldCache, 13000))

//Cache content expired, patched schema is fetched => expected field is based on the patched schema
result must beEqualTo(fieldForPatchedSchema)

//Field is cached after first call (1 second)
fieldCache.get((schemaKey, 1.second)) must beSome

//Field is cached after second call (13 seconds)
fieldCache.get((schemaKey, 13.second)) must beSome
}
}
}

0 comments on commit 3ed5238

Please sign in to comment.