In [13]:
from csv import DictReader
from pprint import PrettyPrinter
from rdflib import Graph, Literal, URIRef, RDF, RDFS, SKOS, SDO
from rdflib.namespace import Namespace, NamespaceManager

In [69]:
def splitCuri(curi):
    """Split a compact uri into prefix and name."""
    parts = curi.split(":")
    if len(parts) == 2 :
        return(parts)
    else:
        msg = "CURI should have one colon."
        raise Exception(msg)

def expandCuri(curi, namespaces):
    [pre, name] = splitCuri(curi)
    if pre in  namespaces.keys():
        return namespaces[pre] + name
    else:
        msg = "Cannot expand curi with unknown prefix " + curi
        raise Exception(msg)
        
def curi2URIRef(curi, namespaces):
    uri = expandCuri(curi, namespaces)
    return URIRef(uri)

def unpackProperties(self, string):
    """Turn a string with properties separted by linebreaks into a list of properties."""
    plist = string.split('\n')
    return plist
        


In [98]:
class Diag2RDFSConverter():
    def __init__(self):
        self.metadata = {"title": "", "date": ""}
        self.defines = str()
        self.schema = Graph()
        self.namespaces = dict()
        
    def convertDiagCSV(self,fname) :
        with open(fname, "r") as diag_file:
            csvReader = DictReader(diag_file)
            for row in csvReader:
                if row["Name"] == "Page":
                    self.convertMetadata(row)
                    self.convertNamespaces(row["prefixes"])
                elif row["Name"] == "Class":
                    self.convertClass(row)

    def convertClass(self, class_info):
        curi = class_info["Text Area 1"]
        [ns_id, name] = splitCuri(curi)
        classRef = curi2URIRef(curi, self.namespaces)
        if ns_id == self.defines :
            self.schema.add((classRef, RDF.type, RDFS.Class))
            if ns_id != "":
                self.schema.add((classRef, RDFS.isDefinedBy, URIRef(self.namespaces[ns_id])))
            if class_info["rdfs:label"] != "":
                self.schema.add((classRef, RDFS.label, Literal(class_info["rdfs:label"])))
            if class_info["rdfs:comment"] != "":
                self.schema.add((classRef, RDFS.comment, Literal(class_info["rdfs:comment"])))
            if class_info["rdfs:subclassof"] != "":
                self.schema.add((classRef, RDFS.subClassOf, Literal(class_info["rdfs:subclassof"]))) #FIXME need URIRef > 1
            if class_info["skos:scopenote"] != "":
                self.schema.add((classRef, SKOS.scopeNote, Literal(class_info["skos:scopenote"])))
        else :
            print( self.defines )
            self.schema.add((classRef, RDF.type, RDFS.Class))
            if ns_id != "":
                self.schema.add((classRef, RDFS.isDefinedBy, URIRef(self.namespaces[ns_id])))
            
            
        
    def convertMetadata(self, row):
        self.metadata["title"] = row["dct:title"]
        self.metadata["date"] = row["dct:date"]
        self.defines = row["defines"]

    def convertNamespaces(self, ns_info):
        if type(ns_info) is not str:
            msg = "Namespace info should be a string"
            raise Exception(msg)
        else:
            for ns_def in ns_info.split("\n"):
                try: 
                    pre = ns_def.split(": ")[0]
                    uri = ns_def.split(": ")[1]
                except:
                    print("Could not process line in namespace info:", ns, ".")
                ns = Namespace(URIRef(uri))
                self.schema.bind(pre, ns)
                self.namespaces[pre] = uri



In [99]:
fname = "DESM_Model2.csv"
d2c = Diag2RDFSConverter()
d2c.convertDiagCSV(fname)
print(d2c.schema.serialize())


@prefix desm: <https://github.com/t3-innovation-network/desm/tree/main/schemas/desmSchema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .

desm:AbstractClassMapping a rdfs:Class ;
    rdfs:label "Abstract Class Mapping" ;
    rdfs:comment "Resource describing mappings by one or more Data Standards Organizations (DSOs) to a single abstract class." ;
    rdfs:isDefinedBy desm: .

desm:AbstractClassSet a rdfs:Class ;
    rdfs:label "Abstract Class Set" ;
    rdfs:comment "Resource aggregating mappings of two or more Abstract Class created by one or more Data Standards Organizations (DSOs)." ;
    rdfs:isDefinedBy desm: ;
    skos:scopeNote "This class is used only where a project creates mappings to more than one absbtract class that are intended by their creators to be viewed as a set." .




In [3]:
class PropertyDefinition(dict):
    """Class for property defintions as dicts."""
    def __init__(self) :
        super().__init__()
        self["curi"] = str()
        self["label"] = str()
        self["comment"] = str()
        self["domain"] = str()
        self["range"] = str()

    def extractClassProp(self, domainCURI, propCURI):
        self["curi"] = propCURI
        self["domain"] = domainCURI
        self["range"] = "rdf:Literal"
    
    def extractLinkProperties(self, row):
        p["curi"] = row["Text Area 1"]
        if row["Source Arrow"] == "None" :
            domainId = row["Line Source"]
            p["domain"] = classes[domainId]["curi"]
        elif row["Destination Arrow"] == "None" :
            domainId = row["Line Destination"]
            p["domain"] = classes[domainId]["curi"]
        if row["Destination Arrow"] == "Arrow" :
            rangeId = row["Line Destination"]
            p["range"] = classes[rangeId]["curi"]
        elif row["Source Arrow"] == "Arrow" :
            rangeId = row["Line Source"]
            p["range"] = classes[rangeId]["curi"]              
        

In [6]:
pp = PrettyPrinter(indent = 2)
diag_fname = "DESM_Model.csv"

In [7]:
classes = dict()
#properties = dict()
properties = list()
prefixes = {"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#"}
with open(diag_fname, "r") as diag_file:
    csvReader = DictReader(diag_file)
    metadata = Metadata()
    for row in csvReader:
        if row["Name"] == "Page":
            metadata.extractMetadata(row)
        elif row["Name"] == "Class":
            c = ClassDefinition()
            c.extractClassDef(row)
            classes[row["Id"]] = c
            class_props = c.unpackProperties(row["Text Area 2"])
            index = 1
            for prop in class_props:
                p = PropertyDefinition()
                p.extractClassProp(c["curi"], prop)
                p_id = row["Id"] +"."+ str(index)
#                properties[p_id] = p
                properties.append(p)
                index += index
        elif row["Name"] == "Line":
            p = PropertyDefinition()
            p.extractLinkProperties(row)
#            properties[row["Id"]] = p
            properties.append(p)
        else:
            next
        
#print("\n\nMetadata\n========")
#pp.pprint(metadata)
#print("\n\nPrefixes\n========")
pp.pprint(prefixes)
#print("\n\nClasses\n========")
pp.pprint(classes)
#print("\n\nProperties\n========")
#pp.pprint(properties)

{'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'}
{ '6': { 'comment': 'Resource aggregating mappings of two or more Abstract '
                    'Class created by one or more Data Standards Organizations '
                    '(DSOs).',
         'curi': 'desm:AbstractClassSet',
         'isDefinedBy': '',
         'label': 'Abstract Class Set',
         'scopeNote': 'This class is used only where a project creates '
                      'mappings to more than one absbtract class that are '
                      'intended by their creators to be viewed as a set.',
         'subClassOf': ''},
  '7': { 'comment': 'Resource describing mappings by one or more Data '
                    'Standards Organizations (DSOs) to a single abstract '
                    'class.',
         'curi': 'desm:AbstractClassMapping',
         'isDefinedBy': '',
         'label': 'Abstract Class Mapping',
         'scopeNote': '',
         'subClassOf': ''}}


In [8]:
schema = Graph()

for prefix in prefixes.keys():
    ns_uri = URIRef(prefixes[prefix])
    ns = Namespace(ns_uri)
    schema.bind(prefix, ns)
    if "base" == prefix.lower():
        self.sg.base = ns_uri
schema.bind("skos", SKOS)
schema.bind("sdo", SDO)

In [9]:
for k in classes.keys():
    curi = classes[k]["curi"]
    [ns_id, name] = splitCuri(curi)
    c = curi2URIRef(curi, prefixes)
    schema.add((c, RDF.type, RDFS.Class))
    if ns_id == metadata["defines"][:-1]:
        definedBy = URIRef(prefixes[ns_id])
        schema.add((c, RDFS.isDefinedBy, definedBy))
        schema.add((c, RDFS.label, Literal(classes[k]["label"], "en")))
        schema.add((c, RDFS.comment, Literal(classes[k]["comment"], "en")))
        if classes[k]["scopeNote"] != "":
            schema.add((c, SKOS.scopeNote, Literal(classes[k]["scopeNote"], "en")))

for prop in properties:
    curi = prop["curi"]
    [ns_id, name] = splitCuri(curi)
    p = curi2URIRef(curi, prefixes)
    schema.add((p, RDF.type, RDF.Property))
    definedBy = URIRef(prefixes[ns_id])
    schema.add((p, RDFS.isDefinedBy, definedBy))
    if ns_id == metadata["defines"][:-1]:
        if prop["domain"] != "":
            schema.add((p, RDFS.domain, curi2URIRef(prop["domain"], prefixes)))
        if prop["range"] != "":
            schema.add((p, RDFS.range, curi2URIRef(prop["range"], prefixes)))
    else:
        if prop["domain"] != "":
            schema.add((p, URIRef("http://schema.org/domainIncludes"), curi2URIRef(prop["domain"], prefixes)))
        if prop["range"] != "":
            schema.add((p, URIRef("http://schema.org/rangeIncludes"), curi2URIRef(prop["range"], prefixes)))
        
        

    
print(schema.serialize())


KeyError: 'desm'