Skip to content

Commit

Permalink
Ported from demo project
Browse files Browse the repository at this point in the history
  • Loading branch information
ulitol97 committed Apr 7, 2022
0 parents commit fe26dd1
Show file tree
Hide file tree
Showing 37 changed files with 2,281 additions and 0 deletions.
67 changes: 67 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Emacs
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*

.bloop
.metals
metals.sbt
.bsp

# Java
*.class
*.log

# sbt specific
dist/*
target/
project/target/

lib_managed/
src_managed/
project/boot/
project/plugins/project/

# Scala-IDE specific
.scala_dependencies

.settings
.settings/*
.project
.classpath
.cache
/bin
settings.json
.vscode
.ensime_cache
.ensime
.idea

node_modules/
.grunt

# Logs
logs
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Runtime data
pids
*.pid
*.seed
*.pid.lock
.npm
*.tgz

# Generated documentation
scaladoc

# Other
.directory

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
![comet@m](https://user-images.githubusercontent.com/35763574/159525749-830e7605-829a-4923-ab94-bd94496d24d8.png)

# comet
Scala application for validation of RDF streams with Shape Expressions
66 changes: 66 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// See version in "version.sbt"
Global / name := "comet" // Friendly app name
ThisBuild / scalaVersion := "3.1.1"
Global / excludeLintKeys ++= Set(name, idePackagePrefix)

lazy val comet = (project in file("."))
.enablePlugins(BuildInfoPlugin)
.settings(
name := "comet",
idePackagePrefix := Some("org.ragna.comet"),
resolverSettings,
libraryDependencies ++= Seq(
catsEffect,
fs2,
fs2Kafka,
shexs,
shaclex,
wesoUtils
),
buildInfoSettings,
// https://stackoverflow.com/q/66372308/9744696
Compile / run / fork := true
)

// Aggregate resolver settings passed down to modules to resolve dependencies
// Helper to resolve dependencies from GitHub packages
lazy val resolverSettings = Seq(
resolvers ++= Seq(
Resolver.DefaultMavenRepository,
Resolver.sonatypeRepo("snapshots")
)
)

// Shared settings for the BuildInfo Plugin
// See https://github.com/sbt/sbt-buildinfo
lazy val buildInfoSettings = Seq(
buildInfoKeys := Seq[BuildInfoKey](
name,
version,
scalaVersion,
sbtVersion
),
buildInfoPackage := "buildinfo",
buildInfoObject := "BuildInfo"
)

// Core dependencies
lazy val catsVersion = "3.3.9"
// FS2 dependencies
lazy val fs2Version = "3.2.5"
lazy val fs2KafkaVersion = "3.0.0-M6"
// WESO dependencies
lazy val shaclexVersion = "0.2.2"
lazy val shexsVersion = "0.2.2"
lazy val umlShaclZexVersion = "0.0.82"
lazy val wesoUtilsVersion = "0.2.4"
// -------------------------------------------------------------------------- //
// Core dependencies
lazy val catsEffect = "org.typelevel" %% "cats-effect" % catsVersion
// FS2 dependencies
lazy val fs2 = "co.fs2" %% "fs2-core" % fs2Version
lazy val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % fs2KafkaVersion
// WESO dependencies
lazy val shexs = "es.weso" %% "shexs" % shexsVersion
lazy val shaclex = "es.weso" %% "shaclex" % shaclexVersion
lazy val wesoUtils = "es.weso" %% "utilstest" % wesoUtilsVersion
Binary file added img/comet@l.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/comet@m.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/comet@xl.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version = 1.6.2
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
218 changes: 218 additions & 0 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package org.ragna.comet

import data.DataFormat
import exception.stream.validations.*
import implicits.RDFElementImplicits.rdfFromString
import model.rdf.RDFElement
import stream.StreamSource
import stream.extractors.StreamExtractor
import stream.extractors.file.{Charsets, FileExtractor}
import stream.extractors.kafka.{KafkaExtractor, KafkaExtractorConfiguration}
import stream.extractors.list.ListExtractor
import trigger.ShapeMapFormat.COMPACT
import trigger.{ShapeMapFormat, TriggerShapeMap, TriggerTargetDeclarations, ValidationTrigger}
import utils.StreamUtils.*
import utils.{FileUtils, IODebugOps, StreamUtils, Timer}
import validation.Validator
import validation.configuration.ValidatorConfiguration

import cats.effect.*
import cats.syntax.functor.*
import es.weso.rdf.PrefixMap
import es.weso.rdf.jena.RDFAsJenaModel
import es.weso.schema.{Schema, Schemas, ShapeMapTrigger, ValidationTrigger as ValidationTriggerShaclex}
import fs2.io.file.*
import fs2.{Pipe, Stream}

import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}
import scala.concurrent.duration.*
import scala.util.Random

/**
* Demo entry point.
* Shall initialize the validation, get the validation Stream
* and print it
*
*/
object Main extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
// Logic to create the files-resource in case the test FileExtractor
val filesResource = FileUtils.createFiles(TestData.rdfItems)

// All program logic, wrapped in the files resource so we can test files
val program =
filesResource.use(
filePaths =>
for {
// Schema for validations
schema <- TestData.mkSchemaShexIO()
// Trigger for validations
trigger = TestData.mkTriggerShex()
// Validator settings
validatorConfiguration = ValidatorConfiguration(schema, trigger, haltOnErrored = true)
// RDF extractors: all types ready to be tested
// - List extractor
listExtractor = ListExtractor(
items = TestData.rdfItems,
format = DataFormat.TURTLE,
itemTimeout = None)

// - Kafka extractor
// kafkaExtractorConfiguration = KafkaExtractorConfiguration("rdf", "localhost", 9092)
kafkaExtractorConfiguration = KafkaExtractorConfiguration(
topic = "rdf",
server = "localhost",
port = 9092)
kafkaExtractor = KafkaExtractor[Unit, String](
configuration = kafkaExtractorConfiguration,
format = DataFormat.TURTLE,
itemTimeout = Some(5.seconds))

// - Files Extractor
fileExtractor = FileExtractor(
files = filePaths,
charset = Charsets.UTF8.value,
format = DataFormat.TURTLE,
itemTimeout = None)

// Validator instance
validator = Validator(validatorConfiguration, listExtractor)
// Open validation stream
app <- validator.validate // Init
// .delayBy(10.minute)
.repeat
.evalTap { _ => IO.println("- Received item") }
// .through(toConsole) // Print validation results (see overridden toString)
.handleErrorWith { err =>
Stream.eval(IO.println(
s"KABOOM (${err.getClass.getSimpleName}): " + err.getMessage))
}
.onFinalize(IO.println("Main finalized"))
.compile.drain

} yield app)


// Wrap app in timer
val app = for {
initTime <- IO(System.currentTimeMillis())
_ <- program
endTime <- IO((System.currentTimeMillis() - initTime) / 1000f)
closing <- IO.println(s"Time: ${endTime}s")
} yield ()
// Execute app
app >> IO.pure(ExitCode.Success)
}
}

//noinspection HttpUrlsUsage
// Example ShEx: https://rdfshape.weso.es/link/16478801915
// Example SHACL: https://rdfshape.weso.es/link/16490955842
private object TestData {

lazy val rdfItems: List[String] = Utils.mkRdfItems(testItems)

// RDF items for testing
private val testItems = 1


// TESTING WITH A LIST OF FILES:
// 1. Start a Stream with RDF texts
// 2. EvalMap the Stream, use Files[IO] to write any incoming item to a file
// 3. Finish the stream, store the list of files created.
// 4. Options:
// 1: - Create a custom resource (Resource.make) with the list of files
// Use Files[IO] to remove all files used after resource usage
// - Try Stream.resource method
// 2: - Just store the list of files and create a function capable of removing files
// - Use Stream.bracket, passing it the list of files and the function to remove them

// ShEx Schema used for testing
private val schemaShexStr: String =
"""|PREFIX ex: <http://example.org/>
|PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
|
|# Filters of a valid sensor reading
|ex:ValidReading {
| ex:readingDatetime xsd:dateTime ; # Has a VALID timestamp
| ex:readingTemperature xsd:decimal MININCLUSIVE 18 MAXINCLUSIVE 20 + ; # 1+ readings in range 18-20
| ex:status [ "OK" "RUNNING" ] # Status must be one of
|}
|""".stripMargin.strip

// SHACL Schema used for testing
private val schemaShaclStr: String =
"""|@prefix sh: <http://www.w3.org/ns/shacl#> .
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|@prefix ex: <http://example.org/> .
|
|# Filters of a valid sensor reading
|ex:ValidReading a sh:NodeShape ;
| sh:targetClass ex:sensorReading ;
| sh:property [
| sh:path ex:readingDatetime ;
| sh:datatype xsd:dateTime ;
| ] ;
| sh:property [
| sh:path ex:readingTemperature ;
| sh:datatype xsd:decimal ;
| sh:minInclusive 18;
| sh:maxInclusive 20 ;
| sh:minCount 1 ; # 1+ readings
| ] ;
| sh:property [
| sh:path ex:status ;
| sh:datatype xsd:string ;
| sh:pattern "OK|RUNNING" ; # Range of states
| sh:maxCount 1 ; # 1 single status
| ] .
|""".stripMargin.strip

// Validation trigger (ShapeMap) for testing
private val shapeMapStr = """ex:reading@ex:ValidReading""".stripMargin

def mkSchemaShexIO(schemaText: String = schemaShexStr, format: String = "ShExC"): IO[Schema] =
Schemas.fromString(schemaText, format, Schemas.shEx.name)

def mkSchemaShaclIO(schemaText: String = schemaShaclStr): IO[Schema] =
Schemas.fromString(schemaText, "Turtle", Schemas.shaclex.name)

def mkTriggerShex(shapeMapText: String = shapeMapStr, format: ShapeMapFormat = COMPACT): ValidationTrigger =
TriggerShapeMap(shapeMapText, format)


// Validation trigger (target declarations) for testing
def mkTriggerShacl: ValidationTrigger =
TriggerTargetDeclarations()
}

//noinspection HttpUrlsUsage
private object Utils {

//noinspection SpellCheckingInspection
private val dateFormatter = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")

// Make a list of RDF strings
def mkRdfItems(size: Int, min: Double = 16, max: Double = 22.5): List[String] =
(0 until size).map(_ => mkRdfItem(min, max)).toList

// RDF Strings used for testing
private def mkRdfItem(min: Double, max: Double): String = {
val dateFormatted = dateFormatter.format(new Date())
// Format with US locale to have dots, not commas
val temperature = String.format(Locale.US, "%.2f", Random.between(min, max))

f"""
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|@prefix ex: <http://example.org/> .
|
|ex:reading a ex:sensorReading ;
| ex:readingDatetime "$dateFormatted"^^xsd:dateTime ;
| ex:readingTemperature "$temperature"^^xsd:decimal ;
| ex:status "OK" .
|""".stripMargin.strip

}
}
14 changes: 14 additions & 0 deletions src/main/scala/data/DataFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.ragna.comet
package data

/**
* RDF data formats accepted by the application as input data
*/
enum DataFormat(val name: String) {
case TURTLE extends DataFormat("Turtle")
case RDFXML extends DataFormat("RDF/XML")
case JSONLD extends DataFormat("JSON-LD")
case NTRIPLES extends DataFormat("N-Triples")
case RDFJSON extends DataFormat("RDF/JSON")
case TRIG extends DataFormat("TriG")
}
Loading

0 comments on commit fe26dd1

Please sign in to comment.