Skip to content

Commit

Permalink
For structured data add ability to skip schemas matching criterion.
Browse files Browse the repository at this point in the history
Slightly different than filtering for unstructured events:

* For unstructured, filtering is implemented in `Transform` by filtering matching entities from events. It's executed for each entity, for each event.
* For structured, filtering is implemented in `NonAtomicFields.resolveTypes`. Resolving types is executed once per batch of events, so it's more efficient than doing it for each event in `Transform`.

We can't use this approach for unstructured events because we don't have the step of resolving types.
  • Loading branch information
pondzix committed Mar 18, 2024
1 parent 94d23b3 commit d07c1a3
Show file tree
Hide file tree
Showing 4 changed files with 454 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

object NonAtomicFields {
Expand Down Expand Up @@ -47,18 +48,54 @@ object NonAtomicFields {

def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
entities: Map[TabledEntity, Set[SchemaSubVersion]],
filterCriteria: List[SchemaCriterion]
): F[Result] =
entities.toList
.map { case (tabledEntity, subVersions) =>
// First phase of entity filtering, before we fetch schemas from Iglu and create `TypedTabledEntity`.
// If all sub-versions are filtered out, whole family is removed.
tabledEntity -> filterSubVersions(filterCriteria, tabledEntity, subVersions)
}
.filter { case (_, subVersions) =>
// Remove whole schema family if there is no subversion left after filtering
subVersions.nonEmpty
}
.traverse { case (tabledEntity, subVersions) =>
SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
// Second phase of entity filtering.
// We can't do it sooner based on a result of `fetchSchemasWithSameModel` because we can't have 'holes' in Iglu schema family when building typed entities.
// Otherwise we may end up with invalid and incompatible merged schema model.
// Here `TypedTabledEntity` is already properly created using contiguous series of schemas, so we can try to skip some sub-versions.
.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
}
.leftMap(ColumnFailure(tabledEntity, subVersions, _))
.value
}
.map { eithers =>
val (failures, good) = eithers.separate
Result(good, failures)
}

private def filterSubVersions(
filterCriteria: List[SchemaCriterion],
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion]
): Set[SchemaSubVersion] =
if (filterCriteria.nonEmpty) {
subVersions
.filter { subVersion =>
val schemaKey = TabledEntity.toSchemaKey(tabledEntity, subVersion)
doesNotMatchCriteria(filterCriteria, schemaKey)
}
} else {
subVersions
}

private def doesNotMatchCriteria(filterCriteria: List[SchemaCriterion], schemaKey: SchemaKey): Boolean =
!filterCriteria.exists(_.matches(schemaKey))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "9-0-0"
},
"type": "object",
"properties": {
"col_z": {"type": "string"}
},
"required": ["col_z"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -105,7 +105,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -140,7 +140,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -176,7 +176,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -214,7 +214,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -253,7 +253,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -271,7 +271,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity2 -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(2)) and
(fields.map(_.tabledEntity) must contain(allOf(tabledEntity1, tabledEntity2)))
Expand Down Expand Up @@ -304,7 +304,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -354,7 +354,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -405,7 +405,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -421,7 +421,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 9))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -441,7 +441,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -461,7 +461,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand Down

0 comments on commit d07c1a3

Please sign in to comment.