Skip to content

Commit

Permalink
Some extractor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ulitol97 committed Apr 26, 2022
1 parent d7f2012 commit a391455
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 228 deletions.
187 changes: 3 additions & 184 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import stream.extractors.list.ListExtractor
import trigger.ShapeMapFormat.COMPACT
import trigger.{ShapeMapFormat, TriggerShapeMap, TriggerTargetDeclarations, ValidationTrigger}
import utils.StreamUtils.*
import utils.{FileUtils, IODebugOps, StreamUtils, Timer}
import utils.{IODebugOps, StreamUtils, Timer}
import validation.Validator
import validation.configuration.ValidatorConfiguration

Expand All @@ -37,187 +37,6 @@ import scala.util.Random
*/
object Main extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
// Logic to create the files-resource in case the test FileExtractor
val filesResource = FileUtils.createFiles(TestData.rdfItems)

// All program logic, wrapped in the files resource so we can test files
val program =
filesResource.use(
filePaths =>
for {
// Schema for validations
schema <- TestData.mkSchemaShaclIO()
// Trigger for validations
trigger = TestData.mkTriggerShacl
// Validator settings
validatorConfiguration = ValidatorConfiguration(schema, trigger, haltOnInvalid = true, haltOnErrored = true)
// RDF extractors: all types ready to be tested
// - List extractor
listExtractor = ListExtractor(
items = TestData.rdfItems,
format = DataFormat.TURTLE,
itemTimeout = None)

// - Kafka extractor
// kafkaExtractorConfiguration = KafkaExtractorConfiguration("rdf", "localhost", 9092)
kafkaExtractorConfiguration = KafkaExtractorConfiguration(
topic = "rdf",
server = "localhost",
port = 9092)
kafkaExtractor = KafkaExtractor[Unit, String](
configuration = kafkaExtractorConfiguration,
format = DataFormat.TURTLE,
itemTimeout = Some(5.seconds))

// - Files Extractor
fileExtractor = FileExtractor(
files = filePaths,
charset = Charsets.UTF8.value,
format = DataFormat.TURTLE,
itemTimeout = None)

// Validator instance
validator = new Validator(validatorConfiguration, listExtractor)
// Open validation stream
app <- validator.validate // Init
// .delayBy(10.minute)
// .repeat
.evalTap { _ => IO.println("- Received item") }
// .through(toConsole) // Print validation results (see overridden toString)
.handleErrorWith { err =>
Stream.eval(IO.println(
s"KABOOM (${err.getClass.getSimpleName}): " + err.getMessage))
}
.onFinalize(IO.println("Main finalized"))
.compile.drain

} yield app)


// Wrap app in timer
val app = for {
initTime <- IO(System.currentTimeMillis())
_ <- program
endTime <- IO((System.currentTimeMillis() - initTime) / 1000f)
closing <- IO.println(s"Time: ${endTime}s")
} yield ()
// Execute app
app >> IO.pure(ExitCode.Success)
}
}

//noinspection HttpUrlsUsage
// Example ShEx: https://rdfshape.weso.es/link/16478801915
// Example SHACL: https://rdfshape.weso.es/link/16490955842
private object TestData {

lazy val rdfItems: List[String] = Utils.mkRdfItems(testItems)

// RDF items for testing
private val testItems = 1


// TESTING WITH A LIST OF FILES:
// 1. Start a Stream with RDF texts
// 2. EvalMap the Stream, use Files[IO] to write any incoming item to a file
// 3. Finish the stream, store the list of files created.
// 4. Options:
// 1: - Create a custom resource (Resource.make) with the list of files
// Use Files[IO] to remove all files used after resource usage
// - Try Stream.resource method
// 2: - Just store the list of files and create a function capable of removing files
// - Use Stream.bracket, passing it the list of files and the function to remove them

// ShEx Schema used for testing
private val schemaShexStr: String =
"""|PREFIX ex: <http://example.org/>
|PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
|
|# Filters of a valid sensor reading
|ex:ValidReading {
| ex:readingDatetime xsd:dateTime ; # Has a VALID timestamp
| ex:readingTemperature xsd:decimal MININCLUSIVE 18 MAXINCLUSIVE 20 + ; # 1+ readings in range 18-20
| ex:status [ "OK" "RUNNING" ] # Status must be one of
|}
|""".stripMargin.strip

// SHACL Schema used for testing
private val schemaShaclStr: String =
"""|@prefix sh: <http://www.w3.org/ns/shacl#> .
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|@prefix ex: <http://example.org/> .
|
|# Filters of a valid sensor reading
|ex:ValidReading a sh:NodeShape ;
| sh:targetClass ex:sensorReading ;
| sh:property [
| sh:path ex:readingDatetime ;
| sh:datatype xsd:dateTime ;
| ] ;
| sh:property [
| sh:path ex:readingTemperature ;
| sh:datatype xsd:decimal ;
| sh:minInclusive 18;
| sh:maxInclusive 20 ;
| sh:minCount 1 ; # 1+ readings
| ] ;
| sh:property [
| sh:path ex:status ;
| sh:datatype xsd:string ;
| sh:pattern "OK|RUNNING" ; # Range of states
| sh:maxCount 1 ; # 1 single status
| ] .
|""".stripMargin.strip

// Validation trigger (ShapeMap) for testing
private val shapeMapStr = """ex:reading@ex:ValidReading""".stripMargin

def mkSchemaShexIO(schemaText: String = schemaShexStr, format: String = "ShExC"): IO[Schema] =
Schemas.fromString(schemaText, format, Schemas.shEx.name)

def mkSchemaShaclIO(schemaText: String = schemaShaclStr): IO[Schema] =
Schemas.fromString(schemaText, "Turtle", Schemas.shaclex.name)

def mkTriggerShex(shapeMapText: String = shapeMapStr, format: ShapeMapFormat = COMPACT): ValidationTrigger =
TriggerShapeMap(shapeMapText, format)


// Validation trigger (target declarations) for testing
def mkTriggerShacl: ValidationTrigger =
TriggerTargetDeclarations()
}

//noinspection HttpUrlsUsage
private object Utils {

//noinspection SpellCheckingInspection
private val dateFormatter = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")

// Make a list of RDF strings
def mkRdfItems(size: Int, min: Double = 16, max: Double = 22.5): List[String] =
(0 until size).map(_ => mkRdfItem(min, max)).toList

// RDF Strings used for testing
private def mkRdfItem(min: Double, max: Double): String = {
val temperature = Random.between(min, max)
mkRdfItem(temperature)
}

private def mkRdfItem(temperature: Double): String = {
val dateFormatted = dateFormatter.format(new Date())
// Format with US locale to have dots, not commas
val temperatureFormatted = String.format(Locale.US, "%.2f", temperature)

f"""
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|@prefix ex: <http://example.org/> .
|
|ex:reading a ex:sensorReading ;
| ex:readingDatetime "$dateFormatted"^^xsd:dateTime ;
| ex:readingTemperature "$temperatureFormatted"^^xsd:decimal ;
| ex:status "OK" .
|""".stripMargin.strip

}
override def run(args: List[String]): IO[ExitCode] =
IO.pure(ExitCode.Success)
}
127 changes: 127 additions & 0 deletions src/test/scala/extraction/ExtractorTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.ragna.comet
package extraction

import data.DataFormat
import data.DataFormat.*
import implicits.RDFElementImplicits.rdfFromString
import schema.ShExSchemaFormat
import schema.ShExSchemaFormat.*
import stream.extractors.StreamExtractor
import stream.extractors.file.FileExtractor
import stream.extractors.list.ListExtractor
import trigger.ShapeMapFormat.*
import trigger.TriggerModeType.{SHAPEMAP, TARGET_DECLARATIONS}
import trigger.{ShapeMapFormat, TriggerModeType, ValidationTrigger}
import utils.Samples.StreamSamples.mkSingleValidationResult
import utils.{FileUtils, Samples}
import validation.Validator
import validation.configuration.ValidatorConfiguration
import validation.result.ResultStatus.*
import validation.result.ValidationResult

import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import es.weso.schema.Schema
import org.scalatest.freespec.AsyncFreeSpec
import org.scalatest.matchers.should.Matchers

/**
* Test suite checking that the available basic data extractors work
*
* The testing goes as follows:
*
* - Some trivial valid RDF/Schema combinations will be produced,
* complying with the apps testing framework (see [[Samples]])
*
* - This same data will be consumed by a validator, but each time the validator
* will be fed through a different extractor to make sure all extractors work
* equally and are correctly abstracted
*
* - It will be tested that as many results are produced as inputs given to the
* extractor, as well as that results are VALID and not ERRORED
*
* Tests are nested as follows to cover all possibilities:
*
* - Per extractor type (List extractor, File extractor, Kafka extractor)
*
* @note In these tests we take for granted the functioning of the validator
* and we are just interested in the extractors
* @note For obvious reasons, the Kafka Extractor can't be tested in a unit test
* and the Kafka extractor is therefore mocked with dummy data
*/
//noinspection RedundantDefaultArgument
class ExtractorTests extends AsyncFreeSpec with AsyncIOSpec with Matchers {

/**
* Number of RDF elements to generate and feed to each extractor
*/
private val numberOfItems = 5

"LIST extractor" - {
// Trivial example with a list extractor, the validation goes OK
"works" in {
val validationResults: IO[List[ValidationResult]] =
for {
// Data, schema, trigger
schema <- Samples.SchemaSamples.mkSchemaShExIO()
trigger = Samples.TriggerSamples.mkTriggerShex()
rdfItems = Samples.RdfSamples.mkRdfItems(numberOfItems, TURTLE)

// List extractor
extractor = ListExtractor(rdfItems, DataFormat.TURTLE)

// Validator init
configuration = ValidatorConfiguration(schema, trigger)
validator = new Validator(configuration, extractor)

results: List[ValidationResult] <- validator.validate.compile.toList
} yield results

// Same amount of outputs, all VALID
validationResults.asserting(results => {
results.length shouldBe numberOfItems
results.forall(_.status == VALID) shouldBe true
})
}
}

"FILE extractor" - {
// Trivial example with a file extractor, the validation goes OK
"works" in {
// Create a resource with temporary files containing RDF
// The files will be read and eventually deleted
val rdfItems = Samples.RdfSamples.mkRdfItems(numberOfItems, TURTLE)
val filesResource = FileUtils.createFiles(rdfItems)

val validationResults: IO[List[ValidationResult]] = filesResource.use(
filePaths =>
for {
// Schema, trigger
schema <- Samples.SchemaSamples.mkSchemaShExIO()
trigger = Samples.TriggerSamples.mkTriggerShex()

// File extractor
extractor = FileExtractor(filePaths, format = TURTLE)

// Validator init
configuration = ValidatorConfiguration(schema, trigger)
validator = new Validator(configuration, extractor)

results: List[ValidationResult] <- validator.validate.compile.toList
} yield results
)

// Same amount of outputs, all VALID
validationResults.asserting(results => {
results.length shouldBe numberOfItems
results.forall(_.status == VALID) shouldBe true
})
}
}

"KAFKA extractor" - {
"works" in {
IO(1).asserting(_ shouldBe 1)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.ragna.comet
package utils

import stream.extractors.file.Charsets
import stream.extractors.file.{Charsets, FileExtractor}

import cats.effect.{IO, Resource}
import fs2.io.file.*
Expand All @@ -13,6 +13,8 @@ import scala.concurrent.duration.*
/**
* Utilities to work with files, in the context of FS2 Streams and using
* FS2 files API
*
* Used for testing the [[FileExtractor]]
*/
object FileUtils {

Expand All @@ -25,7 +27,7 @@ object FileUtils {
* @return A cats Resource for using the created files as needed and making
* sure that all created files are removed after usage
*
* @note Publicly exposed, to make sure people using the API manage their
* @note Publicly exposed, to make sure people using the API manages their
* files as resources
*/
def createFiles(
Expand Down
4 changes: 1 addition & 3 deletions src/test/scala/utils/Samples.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.ragna.comet
package utils

import TestData.shapeMapStr
import Utils.{dateFormatter, mkRdfItem}
import data.DataFormat
import data.DataFormat.*
import exception.stream.timed.StreamTimeoutException
Expand Down Expand Up @@ -183,7 +181,7 @@ object Samples {
* @return A list of Strings, each one being an RDF text with a single
* sensor temperature reading
*/
def mkRdfItems(size: Int, format: DataFormat,
def mkRdfItems(size: Int, format: DataFormat = TURTLE,
min: Double = minValidTemperature,
max: Double = maxValidTemperature): List[String] =
(0 until size).map(_ => mkRdfItem(format, min, max)).toList
Expand Down
Loading

0 comments on commit a391455

Please sign in to comment.