Skip to content

Commit

Permalink
validator tool (#358)
Browse files Browse the repository at this point in the history
* wip: validator

* add small validator in train

* add config validation tool
  • Loading branch information
shuttie committed Mar 31, 2022
1 parent 1955773 commit d8f433f
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import com.typesafe.sbt.packager.docker.{Cmd, DockerPermissionStrategy}

name := "metarank"

version := "0.2.6"
version := "0.2.6-SNAPSHOT-220331"

resolvers ++= Seq(
("maven snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/")
Expand Down
14 changes: 14 additions & 0 deletions doc/deploy/cli-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ $ java -jar metarank-assembly-x.x.x.jar
15:35:35.261 INFO ai.metarank.Main$ - - train: train the ranking ML model
15:35:35.262 INFO ai.metarank.Main$ - - upload: push latest feature values to redis
15:35:35.263 INFO ai.metarank.Main$ - - inference: run the inference API
15:35:35.263 INFO ai.metarank.Main$ - - validate: check config and data files for consistency
15:35:35.265 INFO ai.metarank.Main$ - - help: this help

```
Expand Down Expand Up @@ -91,4 +92,17 @@ Usage: Train [options]
--port <value> redis port, 6379 by default
--batch-size <value> write batch size, 1000 by default
--format <value> state encoding format, protobuf/json
```

### Validate

A useful tool to check your config file and data files for sanity.

```shell
Metarank validator tool
Usage: metarank validate <options>
Possible options:
--config <path> - Validate feature configuration file
--data <path> - Validate historical events dataset
--help - This help
```
2 changes: 1 addition & 1 deletion src/main/scala/ai/metarank/FeatureMapping.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class FeatureMapping(
val rankingValues = rankingFeatures.map(_.value(ranking, state))

for {
item <- ranking.items
item <- ranking.items.toList
} yield {
val weight = interactions.find(_.item == item.id).map(e => weights.getOrElse(e.`type`, 1.0)).getOrElse(0.0)
val values = itemFeatures.map(_.value(ranking, state, item))
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/ai/metarank/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ai.metarank.mode.bootstrap.Bootstrap
import ai.metarank.mode.inference.Inference
import ai.metarank.mode.train.Train
import ai.metarank.mode.upload.Upload
import ai.metarank.mode.validate.Validate
import ai.metarank.util.Logging
import cats.effect.{ExitCode, IO, IOApp}

Expand All @@ -13,6 +14,7 @@ object Main extends IOApp with Logging {
case "inference" :: tail => Inference.run(tail)
case "train" :: tail => Train.run(tail)
case "upload" :: tail => Upload.run(tail)
case "validate" :: tail => Validate.run(tail)
case "help" :: _ => printHelp()
case Nil => printHelp()

Expand All @@ -30,6 +32,7 @@ object Main extends IOApp with Logging {
_ <- IO(logger.info("- train: train the ranking ML model"))
_ <- IO(logger.info("- upload: push latest feature values to redis"))
_ <- IO(logger.info("- inference: run the inference API"))
_ <- IO(logger.info("- validate: check config and data files for consistency"))
_ <- IO(logger.info("- help: this help"))
} yield { ExitCode.Success }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class ImpressionInjectFunction(tpe: String, ttl: FiniteDuration)(implicit
): Unit = for {
r <- Option(ranking.value())
clicks <- Option(interactions.get()).map(_.asScala.toList)
ranks = r.items.map(_.id).zipWithIndex.toMap
ranks = r.items.toList.map(_.id).zipWithIndex.toMap
(maxClick, maxClickRank) <- clicks.flatMap(c => ranks.get(c.item).map(rank => c -> rank)).sortBy(-_._2).headOption
item <- r.items.take(maxClickRank + 1)
} {
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/ai/metarank/mode/train/Train.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Train extends IOApp with Logging {
config <- Config.load(cmd.config)
mapping <- IO { FeatureMapping.fromFeatureSchema(config.features, config.interactions) }
data <- loadData(cmd.input, mapping.datasetDescriptor)
_ <- validate(data)
} yield {
val (train, test) = split(data, cmd.split)
cmd.output.write(trainModel(train, test, cmd.booster, cmd.iterations))
Expand Down Expand Up @@ -55,4 +56,18 @@ object Train extends IOApp with Logging {
val model = booster.fit()
model.save()
}

def validate(ds: Dataset): IO[Unit] = {
if (ds.desc.features.isEmpty) {
IO.raiseError(DatasetValidationError("No features configured"))
} else if (ds.desc.features.size == 1) {
IO.raiseError(DatasetValidationError("Only single ML feature defined"))
} else if (ds.groups.isEmpty) {
IO.raiseError(DatasetValidationError("No click-throughs loaded"))
} else {
IO.unit
}
}

case class DatasetValidationError(msg: String) extends Exception(msg)
}
8 changes: 8 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/CheckResult.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ai.metarank.mode.validate

sealed trait CheckResult

object CheckResult {
case object SuccessfulCheck extends CheckResult
case class FailedCheck(reason: String) extends CheckResult
}
40 changes: 40 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/ConfigValidator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult._
import ai.metarank.util.Logging
import io.circe.{Json, JsonObject}
import io.circe.yaml.parser.parse

object ConfigValidator extends Logging {
def check(file: String): CheckResult = {
parse(file) match {
case Left(value) => FailedCheck(s"yaml syntax error: ${value}")
case Right(yaml) =>
yaml.asObject match {
case Some(obj) =>
logger.info("config file is a YAML object")
checkNonEmpty(obj, "interactions") match {
case SuccessfulCheck => checkNonEmpty(obj, "features")
case f: FailedCheck => f
}
case None => FailedCheck("config file is not an YAML dictionary")
}
}
}

def checkNonEmpty(obj: JsonObject, section: String): CheckResult = {
obj(section) match {
case Some(s) =>
logger.info(s"$section section exists")
s.asArray match {
case Some(list) if list.isEmpty => FailedCheck(s"'$section' section is empty")
case Some(_) =>
logger.info(s"$section section is not empty")
SuccessfulCheck
case None => FailedCheck(s"'$section' section is not a list")
}
case None => FailedCheck(s"'$section' section is missing in config")
}
}

}
52 changes: 52 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/EventFileValidator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult.{FailedCheck, SuccessfulCheck}
import ai.metarank.model.Event
import ai.metarank.model.Event.{InteractionEvent, MetadataEvent, RankingEvent}
import ai.metarank.util.Logging
import better.files.File
import org.apache.commons.io.IOUtils
import io.circe.parser._

import scala.collection.JavaConverters._
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPInputStream

object EventFileValidator extends Logging {
def check(file: File): CheckResult = {
file.extension(includeDot = false) match {
case Some("gz") | Some("gzip") =>
logger.info("GZip compression detected")
val lines =
IOUtils.lineIterator(new GZIPInputStream(file.newFileInputStream), StandardCharsets.UTF_8).asScala.toList
checkContents(lines)
case Some("json") | Some("jsonl") =>
logger.info("No compression detected")
val lines = file.lineIterator.toList
checkContents(lines)
case other => FailedCheck(s"content type $other is not supported")
}
}

def checkContents(lines: List[String]): CheckResult = {
val parsed = lines.map(line => decode[Event](line))
val metadata = parsed.collect { case Right(m @ MetadataEvent(_, _, _, _, _)) =>
m
}
val ints = parsed.collect { case Right(i: InteractionEvent) =>
i
}
val rankings = parsed.collect { case Right(r: RankingEvent) => r }
val failed = parsed.collect { case Left(x) => x }
logger.info(s"total events: ${parsed.size}")
logger.info(s"metadata events: ${metadata.size}")
logger.info(s"interaction events: ${ints.size}")
logger.info(s"ranking events: ${rankings.size}")
logger.info(s"failed parsing events: ${failed.size}")
if (metadata.nonEmpty && rankings.nonEmpty && ints.nonEmpty && failed.isEmpty) {
SuccessfulCheck
} else {
FailedCheck("Problems with event consistency")
}
}
}
41 changes: 41 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/Validate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ai.metarank.mode.validate

import ai.metarank.util.Logging
import better.files.File
import cats.effect.{ExitCode, IO, IOApp}

object Validate extends IOApp with Logging {
case class ValidationError(msg: String) extends Exception(msg)

override def run(args: List[String]): IO[ExitCode] = {
for {
_ <- args match {
case "--config" :: configPath :: Nil => checkConfig(File(configPath))
case "--data" :: dataPath :: Nil => checkData(File(dataPath))
case "--help" :: Nil => printHelp()
case Nil => printHelp()
case other =>
IO.raiseError(new IllegalArgumentException(s"argument $other is not supported, use '--help' for help"))
}
} yield { ExitCode.Success }
}

def printHelp(): IO[Unit] = IO {
logger.info("Metarank validator tool")
logger.info("Usage: metarank validate <options>")
logger.info("")
logger.info("Possible options:")
logger.info(" --config <path> - Validate feature configuration file")
logger.info(" --data <path> - Validate historical events dataset")
logger.info(" --help - This help")
}

def checkConfig(cfg: File): IO[Unit] = ConfigValidator.check(cfg.contentAsString) match {
case CheckResult.SuccessfulCheck => IO { logger.info("Config file is valid") }
case CheckResult.FailedCheck(reason) => IO.raiseError(ValidationError(reason))
}
def checkData(ds: File): IO[Unit] = EventFileValidator.check(ds) match {
case CheckResult.SuccessfulCheck => IO { logger.info("Data file is valid") }
case CheckResult.FailedCheck(reason) => IO.raiseError(ValidationError(reason))
}
}
3 changes: 2 additions & 1 deletion src/main/scala/ai/metarank/model/Event.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.metarank.model

import cats.data.NonEmptyList
import io.circe.generic.extras.Configuration
import io.circe.{Codec, Decoder, DecodingFailure, Encoder}
import io.findify.featury.model.Timestamp
Expand Down Expand Up @@ -35,7 +36,7 @@ object Event {
user: UserId,
session: SessionId,
fields: List[Field] = Nil,
items: List[ItemRelevancy],
items: NonEmptyList[ItemRelevancy],
tenant: String = "default"
) extends FeedbackEvent

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ai/metarank/model/FeatureScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object FeatureScope {
val scope = Scope("item")

override def tags(event: Event): Traversable[Tag] = event match {
case e: RankingEvent => e.items.map(item => Tag(scope, item.id.value))
case e: RankingEvent => e.items.toList.map(item => Tag(scope, item.id.value))
case e: InteractionEvent => Some(Tag(scope, e.item.value))
case e: MetadataEvent => Some(Tag(scope, e.item.value))
}
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/ai/metarank/feature/RelevancyTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import ai.metarank.model.Event.ItemRelevancy
import ai.metarank.model.ItemId
import ai.metarank.model.MValue.SingleValue
import ai.metarank.util.TestRankingEvent
import cats.data.NonEmptyList
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class RelevancyTest extends AnyFlatSpec with Matchers {
it should "extract relevancy" in {
val feature = RelevancyFeature(RelevancySchema("rel"))
val event = TestRankingEvent(Nil).copy(items =
List(
NonEmptyList.of(
ItemRelevancy(ItemId("p1"), 1),
ItemRelevancy(ItemId("p2"), 2)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ClickthroughJoinFunctionTest extends AnyFlatSpec with Matchers with FlinkT
.process(new ClickthroughJoinFunction())
.executeAndCollect(10)
result.map(_.ranking.id.value) shouldBe List("1")
result.flatMap(_.ranking.items.map(_.id.value)) shouldBe List("p1", "p2", "p3")
result.flatMap(_.ranking.items.toList.map(_.id.value)) shouldBe List("p1", "p2", "p3")
result.flatMap(_.interactions.map(_.item.value)) shouldBe List("p2")
}
}
83 changes: 83 additions & 0 deletions src/test/scala/ai/metarank/mode/validate/ConfigValidatorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult.{FailedCheck, SuccessfulCheck}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ConfigValidatorTest extends AnyFlatSpec with Matchers {
it should "accept valid file" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe SuccessfulCheck
}
it should "fail on empty file" in {
ConfigValidator.check("") shouldBe a[FailedCheck]
}

it should "fail on missing interactions" in {
val yaml =
"""features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on empty interactions" in {
val yaml =
"""interactions:
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on interactions being non-object" in {
val yaml =
"""interactions: true
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on missing features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
| """.stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on empty features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features:""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on non-obj features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features: true
|""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}
}

0 comments on commit d8f433f

Please sign in to comment.