Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validator tool #358

Merged
merged 3 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import com.typesafe.sbt.packager.docker.{Cmd, DockerPermissionStrategy}

name := "metarank"

version := "0.2.6"
version := "0.2.6-SNAPSHOT-220331"

resolvers ++= Seq(
("maven snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/")
Expand Down
14 changes: 14 additions & 0 deletions doc/deploy/cli-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ $ java -jar metarank-assembly-x.x.x.jar
15:35:35.261 INFO ai.metarank.Main$ - - train: train the ranking ML model
15:35:35.262 INFO ai.metarank.Main$ - - upload: push latest feature values to redis
15:35:35.263 INFO ai.metarank.Main$ - - inference: run the inference API
15:35:35.263 INFO ai.metarank.Main$ - - validate: check config and data files for consistency
15:35:35.265 INFO ai.metarank.Main$ - - help: this help

```
Expand Down Expand Up @@ -91,4 +92,17 @@ Usage: Train [options]
--port <value> redis port, 6379 by default
--batch-size <value> write batch size, 1000 by default
--format <value> state encoding format, protobuf/json
```

### Validate

A useful tool to check your config file and data files for sanity.

```shell
Metarank validator tool
Usage: metarank validate <options>
Possible options:
--config <path> - Validate feature configuration file
--data <path> - Validate historical events dataset
--help - This help
```
2 changes: 1 addition & 1 deletion src/main/scala/ai/metarank/FeatureMapping.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class FeatureMapping(
val rankingValues = rankingFeatures.map(_.value(ranking, state))

for {
item <- ranking.items
item <- ranking.items.toList
} yield {
val weight = interactions.find(_.item == item.id).map(e => weights.getOrElse(e.`type`, 1.0)).getOrElse(0.0)
val values = itemFeatures.map(_.value(ranking, state, item))
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/ai/metarank/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ai.metarank.mode.bootstrap.Bootstrap
import ai.metarank.mode.inference.Inference
import ai.metarank.mode.train.Train
import ai.metarank.mode.upload.Upload
import ai.metarank.mode.validate.Validate
import ai.metarank.util.Logging
import cats.effect.{ExitCode, IO, IOApp}

Expand All @@ -13,6 +14,7 @@ object Main extends IOApp with Logging {
case "inference" :: tail => Inference.run(tail)
case "train" :: tail => Train.run(tail)
case "upload" :: tail => Upload.run(tail)
case "validate" :: tail => Validate.run(tail)
case "help" :: _ => printHelp()
case Nil => printHelp()

Expand All @@ -30,6 +32,7 @@ object Main extends IOApp with Logging {
_ <- IO(logger.info("- train: train the ranking ML model"))
_ <- IO(logger.info("- upload: push latest feature values to redis"))
_ <- IO(logger.info("- inference: run the inference API"))
_ <- IO(logger.info("- validate: check config and data files for consistency"))
_ <- IO(logger.info("- help: this help"))
} yield { ExitCode.Success }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class ImpressionInjectFunction(tpe: String, ttl: FiniteDuration)(implicit
): Unit = for {
r <- Option(ranking.value())
clicks <- Option(interactions.get()).map(_.asScala.toList)
ranks = r.items.map(_.id).zipWithIndex.toMap
ranks = r.items.toList.map(_.id).zipWithIndex.toMap
(maxClick, maxClickRank) <- clicks.flatMap(c => ranks.get(c.item).map(rank => c -> rank)).sortBy(-_._2).headOption
item <- r.items.take(maxClickRank + 1)
} {
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/ai/metarank/mode/train/Train.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Train extends IOApp with Logging {
config <- Config.load(cmd.config)
mapping <- IO { FeatureMapping.fromFeatureSchema(config.features, config.interactions) }
data <- loadData(cmd.input, mapping.datasetDescriptor)
_ <- validate(data)
} yield {
val (train, test) = split(data, cmd.split)
cmd.output.write(trainModel(train, test, cmd.booster, cmd.iterations))
Expand Down Expand Up @@ -55,4 +56,18 @@ object Train extends IOApp with Logging {
val model = booster.fit()
model.save()
}

def validate(ds: Dataset): IO[Unit] = {
if (ds.desc.features.isEmpty) {
IO.raiseError(DatasetValidationError("No features configured"))
} else if (ds.desc.features.size == 1) {
IO.raiseError(DatasetValidationError("Only single ML feature defined"))
} else if (ds.groups.isEmpty) {
IO.raiseError(DatasetValidationError("No click-throughs loaded"))
} else {
IO.unit
}
}

case class DatasetValidationError(msg: String) extends Exception(msg)
}
8 changes: 8 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/CheckResult.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ai.metarank.mode.validate

sealed trait CheckResult

object CheckResult {
case object SuccessfulCheck extends CheckResult
case class FailedCheck(reason: String) extends CheckResult
}
40 changes: 40 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/ConfigValidator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult._
import ai.metarank.util.Logging
import io.circe.{Json, JsonObject}
import io.circe.yaml.parser.parse

object ConfigValidator extends Logging {
def check(file: String): CheckResult = {
parse(file) match {
case Left(value) => FailedCheck(s"yaml syntax error: ${value}")
case Right(yaml) =>
yaml.asObject match {
case Some(obj) =>
logger.info("config file is a YAML object")
checkNonEmpty(obj, "interactions") match {
case SuccessfulCheck => checkNonEmpty(obj, "features")
case f: FailedCheck => f
}
case None => FailedCheck("config file is not an YAML dictionary")
}
}
}

def checkNonEmpty(obj: JsonObject, section: String): CheckResult = {
obj(section) match {
case Some(s) =>
logger.info(s"$section section exists")
s.asArray match {
case Some(list) if list.isEmpty => FailedCheck(s"'$section' section is empty")
case Some(_) =>
logger.info(s"$section section is not empty")
SuccessfulCheck
case None => FailedCheck(s"'$section' section is not a list")
}
case None => FailedCheck(s"'$section' section is missing in config")
}
}

}
52 changes: 52 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/EventFileValidator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult.{FailedCheck, SuccessfulCheck}
import ai.metarank.model.Event
import ai.metarank.model.Event.{InteractionEvent, MetadataEvent, RankingEvent}
import ai.metarank.util.Logging
import better.files.File
import org.apache.commons.io.IOUtils
import io.circe.parser._

import scala.collection.JavaConverters._
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPInputStream

object EventFileValidator extends Logging {
def check(file: File): CheckResult = {
file.extension(includeDot = false) match {
case Some("gz") | Some("gzip") =>
logger.info("GZip compression detected")
val lines =
IOUtils.lineIterator(new GZIPInputStream(file.newFileInputStream), StandardCharsets.UTF_8).asScala.toList
checkContents(lines)
case Some("json") | Some("jsonl") =>
logger.info("No compression detected")
val lines = file.lineIterator.toList
checkContents(lines)
case other => FailedCheck(s"content type $other is not supported")
}
}

def checkContents(lines: List[String]): CheckResult = {
val parsed = lines.map(line => decode[Event](line))
val metadata = parsed.collect { case Right(m @ MetadataEvent(_, _, _, _, _)) =>
m
}
val ints = parsed.collect { case Right(i: InteractionEvent) =>
i
}
val rankings = parsed.collect { case Right(r: RankingEvent) => r }
val failed = parsed.collect { case Left(x) => x }
logger.info(s"total events: ${parsed.size}")
logger.info(s"metadata events: ${metadata.size}")
logger.info(s"interaction events: ${ints.size}")
logger.info(s"ranking events: ${rankings.size}")
logger.info(s"failed parsing events: ${failed.size}")
if (metadata.nonEmpty && rankings.nonEmpty && ints.nonEmpty && failed.isEmpty) {
SuccessfulCheck
} else {
FailedCheck("Problems with event consistency")
}
}
}
41 changes: 41 additions & 0 deletions src/main/scala/ai/metarank/mode/validate/Validate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ai.metarank.mode.validate

import ai.metarank.util.Logging
import better.files.File
import cats.effect.{ExitCode, IO, IOApp}

object Validate extends IOApp with Logging {
case class ValidationError(msg: String) extends Exception(msg)

override def run(args: List[String]): IO[ExitCode] = {
for {
_ <- args match {
case "--config" :: configPath :: Nil => checkConfig(File(configPath))
case "--data" :: dataPath :: Nil => checkData(File(dataPath))
case "--help" :: Nil => printHelp()
case Nil => printHelp()
case other =>
IO.raiseError(new IllegalArgumentException(s"argument $other is not supported, use '--help' for help"))
}
} yield { ExitCode.Success }
}

def printHelp(): IO[Unit] = IO {
logger.info("Metarank validator tool")
logger.info("Usage: metarank validate <options>")
logger.info("")
logger.info("Possible options:")
logger.info(" --config <path> - Validate feature configuration file")
logger.info(" --data <path> - Validate historical events dataset")
logger.info(" --help - This help")
}

def checkConfig(cfg: File): IO[Unit] = ConfigValidator.check(cfg.contentAsString) match {
case CheckResult.SuccessfulCheck => IO { logger.info("Config file is valid") }
case CheckResult.FailedCheck(reason) => IO.raiseError(ValidationError(reason))
}
def checkData(ds: File): IO[Unit] = EventFileValidator.check(ds) match {
case CheckResult.SuccessfulCheck => IO { logger.info("Data file is valid") }
case CheckResult.FailedCheck(reason) => IO.raiseError(ValidationError(reason))
}
}
3 changes: 2 additions & 1 deletion src/main/scala/ai/metarank/model/Event.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.metarank.model

import cats.data.NonEmptyList
import io.circe.generic.extras.Configuration
import io.circe.{Codec, Decoder, DecodingFailure, Encoder}
import io.findify.featury.model.Timestamp
Expand Down Expand Up @@ -35,7 +36,7 @@ object Event {
user: UserId,
session: SessionId,
fields: List[Field] = Nil,
items: List[ItemRelevancy],
items: NonEmptyList[ItemRelevancy],
tenant: String = "default"
) extends FeedbackEvent

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ai/metarank/model/FeatureScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object FeatureScope {
val scope = Scope("item")

override def tags(event: Event): Traversable[Tag] = event match {
case e: RankingEvent => e.items.map(item => Tag(scope, item.id.value))
case e: RankingEvent => e.items.toList.map(item => Tag(scope, item.id.value))
case e: InteractionEvent => Some(Tag(scope, e.item.value))
case e: MetadataEvent => Some(Tag(scope, e.item.value))
}
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/ai/metarank/feature/RelevancyTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import ai.metarank.model.Event.ItemRelevancy
import ai.metarank.model.ItemId
import ai.metarank.model.MValue.SingleValue
import ai.metarank.util.TestRankingEvent
import cats.data.NonEmptyList
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class RelevancyTest extends AnyFlatSpec with Matchers {
it should "extract relevancy" in {
val feature = RelevancyFeature(RelevancySchema("rel"))
val event = TestRankingEvent(Nil).copy(items =
List(
NonEmptyList.of(
ItemRelevancy(ItemId("p1"), 1),
ItemRelevancy(ItemId("p2"), 2)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ClickthroughJoinFunctionTest extends AnyFlatSpec with Matchers with FlinkT
.process(new ClickthroughJoinFunction())
.executeAndCollect(10)
result.map(_.ranking.id.value) shouldBe List("1")
result.flatMap(_.ranking.items.map(_.id.value)) shouldBe List("p1", "p2", "p3")
result.flatMap(_.ranking.items.toList.map(_.id.value)) shouldBe List("p1", "p2", "p3")
result.flatMap(_.interactions.map(_.item.value)) shouldBe List("p2")
}
}
83 changes: 83 additions & 0 deletions src/test/scala/ai/metarank/mode/validate/ConfigValidatorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package ai.metarank.mode.validate

import ai.metarank.mode.validate.CheckResult.{FailedCheck, SuccessfulCheck}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ConfigValidatorTest extends AnyFlatSpec with Matchers {
it should "accept valid file" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe SuccessfulCheck
}
it should "fail on empty file" in {
ConfigValidator.check("") shouldBe a[FailedCheck]
}

it should "fail on missing interactions" in {
val yaml =
"""features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on empty interactions" in {
val yaml =
"""interactions:
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on interactions being non-object" in {
val yaml =
"""interactions: true
|features:
| - name: foo
| type: number
| scope: item
| source: metadata.foo""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on missing features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
| """.stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on empty features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features:""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}

it should "fail on non-obj features" in {
val yaml =
"""interactions:
| - name: click
| weight: 1
|features: true
|""".stripMargin
ConfigValidator.check(yaml) shouldBe a[FailedCheck]
}
}