Skip to content

Commit

Permalink
Merge pull request #36 from nfdi4cat/issue11
Browse files Browse the repository at this point in the history
Add validation with profile to excel_to_rdf
  • Loading branch information
peterphilips committed May 26, 2022
2 parents ce04c8c + 346a391 commit ea00b64
Showing 1 changed file with 118 additions and 80 deletions.
198 changes: 118 additions & 80 deletions src/voc4cat/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import sys

from pathlib import Path
from typing import Dict, Literal
from typing import Dict, Literal, Union

import pyshacl
from colorama import Fore, Style
from pydantic.error_wrappers import ValidationError
from pyshacl.pytypes import GraphLike

try:
import models
Expand Down Expand Up @@ -72,12 +73,102 @@
TEMPLATE_VERSION = None


def validate_with_profile(
data_graph: Union[GraphLike, str, bytes],
profile="vocpub",
error_level=1,
message_level=1,
log_file=None,
):
if profile not in profiles.PROFILES.keys():
raise ValueError(
"The profile chosen for conversion must be one of '{}'. 'vocpub' is default".format(
"', '".join(profiles.PROFILES.keys())
)
)
allow_warnings = True if error_level > 1 else False

# validate the RDF file
conforms, results_graph, results_text = pyshacl.validate(
data_graph,
shacl_graph=str(Path(__file__).parent / "validator.vocpub.ttl"),
allow_warnings=allow_warnings,
)

logging_level = logging.INFO

if message_level == 3:
logging_level = logging.ERROR
elif message_level == 2:
logging_level = logging.WARNING

if log_file:
logging.basicConfig(
level=logging_level, format="%(message)s", filename=log_file, force=True
)
else:
logging.basicConfig(level=logging_level, format="%(message)s")

info_list = []
warning_list = []
violation_list = []

from rdflib.namespace import RDF, SH

for report in results_graph.subjects(RDF.type, SH.ValidationReport):
for result in results_graph.objects(report, SH.result):
result_dict = {}
for p, o in results_graph.predicate_objects(result):
if p == SH.focusNode:
result_dict["focusNode"] = str(o)
elif p == SH.resultMessage:
result_dict["resultMessage"] = str(o)
elif p == SH.resultSeverity:
result_dict["resultSeverity"] = str(o)
elif p == SH.sourceConstraintComponent:
result_dict["sourceConstraintComponent"] = str(o)
elif p == SH.sourceShape:
result_dict["sourceShape"] = str(o)
elif p == SH.value:
result_dict["value"] = str(o)
result_message_formatted = log_msg(result_dict, log_file)
result_message = log_msg(result_dict, "placeholder")
if result_dict["resultSeverity"] == str(SH.Info):
logging.info(result_message_formatted)
info_list.append(result_message)
elif result_dict["resultSeverity"] == str(SH.Warning):
logging.warning(result_message_formatted)
warning_list.append(result_message)
elif result_dict["resultSeverity"] == str(SH.Violation):
logging.error(result_message_formatted)
violation_list.append(result_message)

error_messages = []

if error_level == 3:
error_messages = violation_list
elif error_level == 2:
error_messages = warning_list + violation_list
else: # error_level == 1
error_messages = info_list + warning_list + violation_list

if len(error_messages) > 0:
raise ConversionError(
f"The file you supplied is not valid according to the {profile} profile."
)


def excel_to_rdf(
file_to_convert_path: Path,
profile="vocpub",
sheet_name=None,
output_type: Literal["file", "string", "graph"] = "file",
output_file_path=None,
output_format: Literal["turtle", "xml", "json-ld"] = "turtle",
error_level=1,
message_level=1,
log_file=None,
validate=False,
):
"""Converts a sheet within an Excel workbook to an RDF file"""
wb = load_workbook(file_to_convert_path)
Expand Down Expand Up @@ -139,12 +230,24 @@ def excel_to_rdf(
raise ConversionError(f"ConceptScheme processing error: {e}")

# Build the total vocab
v = models.Vocabulary(concept_scheme=cs, concepts=concepts, collections=collections)
vocab_graph = models.Vocabulary(
concept_scheme=cs, concepts=concepts, collections=collections
).to_graph()

if validate:
validate_with_profile(
vocab_graph,
profile=profile,
error_level=error_level,
message_level=message_level,
log_file=log_file,
)

# Write out the file
if output_type == "graph":
return v.to_graph()
return vocab_graph
elif output_type == "string":
return v.to_graph().serialize(format=output_format)
return vocab_graph.serialize(format=output_format)
else: # output_format == "file":
if output_file_path is not None:
dest = output_file_path
Expand All @@ -156,7 +259,7 @@ def excel_to_rdf(
else:
suffix = ".ttl"
dest = file_to_convert_path.with_suffix(suffix)
v.to_graph().serialize(destination=str(dest), format=output_format)
vocab_graph.serialize(destination=str(dest), format=output_format)
return dest


Expand All @@ -177,84 +280,14 @@ def rdf_to_excel(
"', '".join(RDF_FILE_ENDINGS.keys())
)
)
if profile not in profiles.PROFILES.keys():
raise ValueError(
"The profile chosen for conversion must be one of '{}'. 'vocpub' is default".format(
"', '".join(profiles.PROFILES.keys())
)
)

allow_warnings = True if error_level > 1 else False

# validate the RDF file
conforms, results_graph, results_text = pyshacl.validate(
validate_with_profile(
str(file_to_convert_path),
shacl_graph=str(Path(__file__).parent / "validator.vocpub.ttl"),
allow_warnings=allow_warnings,
profile=profile,
error_level=error_level,
message_level=message_level,
log_file=log_file,
)

logging_level = logging.INFO

if message_level == 3:
logging_level = logging.ERROR
elif message_level == 2:
logging_level = logging.WARNING

if log_file:
logging.basicConfig(
level=logging_level, format="%(message)s", filename=log_file, force=True
)
else:
logging.basicConfig(level=logging_level, format="%(message)s")

info_list = []
warning_list = []
violation_list = []

from rdflib.namespace import RDF, SH

for report in results_graph.subjects(RDF.type, SH.ValidationReport):
for result in results_graph.objects(report, SH.result):
result_dict = {}
for p, o in results_graph.predicate_objects(result):
if p == SH.focusNode:
result_dict["focusNode"] = str(o)
elif p == SH.resultMessage:
result_dict["resultMessage"] = str(o)
elif p == SH.resultSeverity:
result_dict["resultSeverity"] = str(o)
elif p == SH.sourceConstraintComponent:
result_dict["sourceConstraintComponent"] = str(o)
elif p == SH.sourceShape:
result_dict["sourceShape"] = str(o)
elif p == SH.value:
result_dict["value"] = str(o)
result_message_formatted = log_msg(result_dict, log_file)
result_message = log_msg(result_dict, "placeholder")
if result_dict["resultSeverity"] == str(SH.Info):
logging.info(result_message_formatted)
info_list.append(result_message)
elif result_dict["resultSeverity"] == str(SH.Warning):
logging.warning(result_message_formatted)
warning_list.append(result_message)
elif result_dict["resultSeverity"] == str(SH.Violation):
logging.error(result_message_formatted)
violation_list.append(result_message)

error_messages = []

if error_level == 3:
error_messages = violation_list
elif error_level == 2:
error_messages = warning_list + violation_list
else: # error_level == 1
error_messages = info_list + warning_list + violation_list

if len(error_messages) > 0:
raise ConversionError(
f"The file you supplied is not valid according to the {profile} profile."
)

# the RDF is valid so extract data and create Excel
from rdflib import Graph
from rdflib.namespace import DCAT, DCTERMS, PROV, RDF, RDFS, SKOS, OWL
Expand Down Expand Up @@ -586,10 +619,15 @@ def main(args=None):
try:
o = excel_to_rdf(
args.file_to_convert,
profile=args.profile,
sheet_name=args.sheet,
output_type=args.outputtype,
output_file_path=args.outputfile,
output_format=args.outputformat,
error_level=int(args.errorlevel),
message_level=int(args.messagelevel),
log_file=args.logfile,
validate=True,
)
if args.outputtype == "string":
print(o)
Expand Down

0 comments on commit ea00b64

Please sign in to comment.