From 346a391845e2f8d1d776ab7de69d95bbffc717e7 Mon Sep 17 00:00:00 2001 From: David Linke Date: Tue, 24 May 2022 23:01:45 +0200 Subject: [PATCH] Add validation with profile to excel_to_rdf --- src/voc4cat/convert.py | 198 ++++++++++++++++++++++++----------------- 1 file changed, 118 insertions(+), 80 deletions(-) diff --git a/src/voc4cat/convert.py b/src/voc4cat/convert.py index 6e8f260..0202bb4 100644 --- a/src/voc4cat/convert.py +++ b/src/voc4cat/convert.py @@ -3,11 +3,12 @@ import sys from pathlib import Path -from typing import Dict, Literal +from typing import Dict, Literal, Union import pyshacl from colorama import Fore, Style from pydantic.error_wrappers import ValidationError +from pyshacl.pytypes import GraphLike try: import models @@ -72,12 +73,102 @@ TEMPLATE_VERSION = None +def validate_with_profile( + data_graph: Union[GraphLike, str, bytes], + profile="vocpub", + error_level=1, + message_level=1, + log_file=None, +): + if profile not in profiles.PROFILES.keys(): + raise ValueError( + "The profile chosen for conversion must be one of '{}'. 'vocpub' is default".format( + "', '".join(profiles.PROFILES.keys()) + ) + ) + allow_warnings = True if error_level > 1 else False + + # validate the RDF file + conforms, results_graph, results_text = pyshacl.validate( + data_graph, + shacl_graph=str(Path(__file__).parent / "validator.vocpub.ttl"), + allow_warnings=allow_warnings, + ) + + logging_level = logging.INFO + + if message_level == 3: + logging_level = logging.ERROR + elif message_level == 2: + logging_level = logging.WARNING + + if log_file: + logging.basicConfig( + level=logging_level, format="%(message)s", filename=log_file, force=True + ) + else: + logging.basicConfig(level=logging_level, format="%(message)s") + + info_list = [] + warning_list = [] + violation_list = [] + + from rdflib.namespace import RDF, SH + + for report in results_graph.subjects(RDF.type, SH.ValidationReport): + for result in results_graph.objects(report, SH.result): + result_dict = {} + for p, o in results_graph.predicate_objects(result): + if p == SH.focusNode: + result_dict["focusNode"] = str(o) + elif p == SH.resultMessage: + result_dict["resultMessage"] = str(o) + elif p == SH.resultSeverity: + result_dict["resultSeverity"] = str(o) + elif p == SH.sourceConstraintComponent: + result_dict["sourceConstraintComponent"] = str(o) + elif p == SH.sourceShape: + result_dict["sourceShape"] = str(o) + elif p == SH.value: + result_dict["value"] = str(o) + result_message_formatted = log_msg(result_dict, log_file) + result_message = log_msg(result_dict, "placeholder") + if result_dict["resultSeverity"] == str(SH.Info): + logging.info(result_message_formatted) + info_list.append(result_message) + elif result_dict["resultSeverity"] == str(SH.Warning): + logging.warning(result_message_formatted) + warning_list.append(result_message) + elif result_dict["resultSeverity"] == str(SH.Violation): + logging.error(result_message_formatted) + violation_list.append(result_message) + + error_messages = [] + + if error_level == 3: + error_messages = violation_list + elif error_level == 2: + error_messages = warning_list + violation_list + else: # error_level == 1 + error_messages = info_list + warning_list + violation_list + + if len(error_messages) > 0: + raise ConversionError( + f"The file you supplied is not valid according to the {profile} profile." + ) + + def excel_to_rdf( file_to_convert_path: Path, + profile="vocpub", sheet_name=None, output_type: Literal["file", "string", "graph"] = "file", output_file_path=None, output_format: Literal["turtle", "xml", "json-ld"] = "turtle", + error_level=1, + message_level=1, + log_file=None, + validate=False, ): """Converts a sheet within an Excel workbook to an RDF file""" wb = load_workbook(file_to_convert_path) @@ -139,12 +230,24 @@ def excel_to_rdf( raise ConversionError(f"ConceptScheme processing error: {e}") # Build the total vocab - v = models.Vocabulary(concept_scheme=cs, concepts=concepts, collections=collections) + vocab_graph = models.Vocabulary( + concept_scheme=cs, concepts=concepts, collections=collections + ).to_graph() + + if validate: + validate_with_profile( + vocab_graph, + profile=profile, + error_level=error_level, + message_level=message_level, + log_file=log_file, + ) + # Write out the file if output_type == "graph": - return v.to_graph() + return vocab_graph elif output_type == "string": - return v.to_graph().serialize(format=output_format) + return vocab_graph.serialize(format=output_format) else: # output_format == "file": if output_file_path is not None: dest = output_file_path @@ -156,7 +259,7 @@ def excel_to_rdf( else: suffix = ".ttl" dest = file_to_convert_path.with_suffix(suffix) - v.to_graph().serialize(destination=str(dest), format=output_format) + vocab_graph.serialize(destination=str(dest), format=output_format) return dest @@ -177,84 +280,14 @@ def rdf_to_excel( "', '".join(RDF_FILE_ENDINGS.keys()) ) ) - if profile not in profiles.PROFILES.keys(): - raise ValueError( - "The profile chosen for conversion must be one of '{}'. 'vocpub' is default".format( - "', '".join(profiles.PROFILES.keys()) - ) - ) - allow_warnings = True if error_level > 1 else False - - # validate the RDF file - conforms, results_graph, results_text = pyshacl.validate( + validate_with_profile( str(file_to_convert_path), - shacl_graph=str(Path(__file__).parent / "validator.vocpub.ttl"), - allow_warnings=allow_warnings, + profile=profile, + error_level=error_level, + message_level=message_level, + log_file=log_file, ) - - logging_level = logging.INFO - - if message_level == 3: - logging_level = logging.ERROR - elif message_level == 2: - logging_level = logging.WARNING - - if log_file: - logging.basicConfig( - level=logging_level, format="%(message)s", filename=log_file, force=True - ) - else: - logging.basicConfig(level=logging_level, format="%(message)s") - - info_list = [] - warning_list = [] - violation_list = [] - - from rdflib.namespace import RDF, SH - - for report in results_graph.subjects(RDF.type, SH.ValidationReport): - for result in results_graph.objects(report, SH.result): - result_dict = {} - for p, o in results_graph.predicate_objects(result): - if p == SH.focusNode: - result_dict["focusNode"] = str(o) - elif p == SH.resultMessage: - result_dict["resultMessage"] = str(o) - elif p == SH.resultSeverity: - result_dict["resultSeverity"] = str(o) - elif p == SH.sourceConstraintComponent: - result_dict["sourceConstraintComponent"] = str(o) - elif p == SH.sourceShape: - result_dict["sourceShape"] = str(o) - elif p == SH.value: - result_dict["value"] = str(o) - result_message_formatted = log_msg(result_dict, log_file) - result_message = log_msg(result_dict, "placeholder") - if result_dict["resultSeverity"] == str(SH.Info): - logging.info(result_message_formatted) - info_list.append(result_message) - elif result_dict["resultSeverity"] == str(SH.Warning): - logging.warning(result_message_formatted) - warning_list.append(result_message) - elif result_dict["resultSeverity"] == str(SH.Violation): - logging.error(result_message_formatted) - violation_list.append(result_message) - - error_messages = [] - - if error_level == 3: - error_messages = violation_list - elif error_level == 2: - error_messages = warning_list + violation_list - else: # error_level == 1 - error_messages = info_list + warning_list + violation_list - - if len(error_messages) > 0: - raise ConversionError( - f"The file you supplied is not valid according to the {profile} profile." - ) - # the RDF is valid so extract data and create Excel from rdflib import Graph from rdflib.namespace import DCAT, DCTERMS, PROV, RDF, RDFS, SKOS, OWL @@ -582,10 +615,15 @@ def main(args=None): try: o = excel_to_rdf( args.file_to_convert, + profile=args.profile, sheet_name=args.sheet, output_type=args.outputtype, output_file_path=args.outputfile, output_format=args.outputformat, + error_level=int(args.errorlevel), + message_level=int(args.messagelevel), + log_file=args.logfile, + validate=True, ) if args.outputtype == "string": print(o)