In [7]:
from rdflib import BNode, URIRef, Literal, Graph, Namespace
from rdflib.collection import Collection
from rdflib.util import guess_format
from rdflib.namespace import RDF, XSD, RDFS, OWL, SKOS, DCTERMS, NamespaceManager
from rdflib.plugins.sparql import prepareQuery
from datetime import datetime
from urllib.request import urlopen, pathname2url
from urllib.parse import urlparse, urljoin
from typing import Dict, List, Tuple
import logging
from re import sub
from dotenv import load_dotenv
load_dotenv()

from deep_translator import GoogleTranslator

from openai import AzureOpenAI


logger = logging.getLogger()
logger.setLevel(logging.INFO)

def path2url(path):
    return urljoin(
      'file:', pathname2url(os.path.abspath(path)))

dir=os.getcwd()
PMDCO = Namespace('https://w3id.org/pmd/co/')
bfo2020_url='http://purl.obolibrary.org/obo/bfo/2020/bfo.owl'
BFO = Namespace(bfo2020_url+"/")    
OBO = Namespace('http://purl.obolibrary.org/obo/')
PROV= Namespace('http://www.w3.org/ns/prov#')
IOFAV = Namespace('https://spec.industrialontologies.org/ontology/core/meta/AnnotationVocabulary/')
PATO = Namespace('http://purl.obolibrary.org/obo/pato/releases/2023-05-18/pato-full.owl')

editor="Thomas Hanke"

#filename="pmdco-qualities-no_pato.ttl"
filename="pmdco-qualities.ttl"

this_ontology_url=path2url(filename)
pato_source="pato-full.owl"
pato_url=path2url(pato_source)
iao_source="iao.rdf"
iao_url=path2url(iao_source)

imported_from=URIRef('http://purl.obolibrary.org/obo/IAO_0000412')

output_filename="curated_"+filename.rsplit('.',1)[0]+'.rdf'
output_format='application/rdf+xml'

def get_base_uri(g: Graph):
    base_namespace=None
    for ns_prefix, namespace in list(g.namespace_manager.namespaces()):
        if ns_prefix.strip() in ['base', ""]:
            base_namespace = namespace
    if base_namespace:
        logging.debug(
            "found the following base or empty prefix namespace {}".format(
                base_namespace
            )
        )
    return base_namespace

# Snake Chase - your_term
def snake_case(s):
  return '_'.join(
    sub('([A-Z][a-z]+)', r' \1',
    sub('([A-Z]+)', r' \1',
    s.replace('-', ' '))).split()).lower()

# Camel Chase - yourTerm
def lower_camel_case(s):
  #print(s)
  s = sub(r"(_|-)+", " ", s).title().replace(" ", "")
  return ''.join([s[0].lower(), s[1:]])

# Pascal Chase -  YourTerm
def upper_camel_case(s):
  #print(s)
  s = sub(r"(_|-)+", " ", s).title().replace(" ", "")
  return s

def strip_special_chars(s):
    #return sub('[^A-Za-z0-9]+', ' ', s)
    return sub('\W+',' ', s )


def parse_graph(url: str, graph: Graph=Graph(), format: str = "") -> Graph:
    """Parse a Graph from web url to rdflib graph object
    Args:
        url (AnyUrl): Url to an web ressource
        graph (Graph): Existing Rdflib Graph object to parse data to.
    Returns:
        Graph: Rdflib graph Object
    """
    logging.debug("parsing graph from {}".format(url))
    parsed_url = urlparse(url)
    META = Namespace(url + "/")
    if not format:
        format = guess_format(parsed_url.path)
    if parsed_url.scheme in ["https", "http"]:
        graph.parse(urlopen(parsed_url.geturl()).read(), format=format)
    elif parsed_url.scheme == "file":
        graph.parse(parsed_url.path, format=format)
    graph.bind("meta", META)
    return graph

def add_ontology_header(g):
    g.bind('owl',OWL)
    g.bind('bfo',BFO)
    g.bind('obo',OBO)
    g.bind('skos',SKOS)
    g.bind('dcterms',DCTERMS)
    g.bind('iof-av',IOFAV)
    g.bind('pmdco',PMDCO)
    g.bind('prov',PROV)
    return g

sub_classes = prepareQuery("SELECT ?entity WHERE {?entity rdfs:subClassOf* ?parent}")

all_labels = prepareQuery("SELECT ?entity ?label WHERE {?entity rdfs:label ?label}")

all_labels_definitions = prepareQuery("SELECT ?entity ?label ?definition WHERE { \
                                      ?entity rdfs:label ?label; \
                                        skos:definition ?definition. \
                                      }")

def get_all_sub_classes(superclass: URIRef, ontology: Graph) -> List[URIRef]:
    """Gets all subclasses of a given class.

    Args:
        superclass (URIRef): Rdflib URIRef of the superclass

    Returns:
        List[URIRef]: List of all subclasses
    """
    # parse template and add mapping results
    results = list(
        ontology.query(
            sub_classes,
            initBindings={"parent": superclass},
            # initNs={'cco': CCO, 'mseo': MSEO},
        ),
    )
    # print(list(ontology[ : RDFS.subClassOf]))
    classes = [result[0] for result in results]
    logger.debug("Found following subclasses of {}: {}".format(superclass, classes))
    logger.debug("Found {} subclasses of {}".format(len(classes),superclass))
    return classes

def filter_entities(entity_list: List, filter_words: List, g: Graph)-> List:
    """
    """
    res=entity_list.copy()
    for entity in entity_list:
        label=str(g.value(entity,RDFS.label))
        if any(word in label.lower() for word in filter_words):
            logger.debug('found a entity {} with label {} to be filter out'.format(entity,label))
            # we dont want this term and all its subclasses
            to_remove=get_all_sub_classes(entity,g)
            #print([this for this in to_remove if this not in to_add])
            [res.remove(this) for this in to_remove if this in res]
    logger.info("filtered out {} entities because there labels contain one of the words {}".format(len(entity_list)-len(res),filter_words))
    return res

def import_entities(g: Graph, g_superclass: URIRef, from_graph: Graph, from_graph_superclass: URIRef, filters: List[str]=[])-> Graph:
    to_import_entities=get_all_sub_classes(from_graph_superclass,from_graph)
    #substract already imported ones
    imported_entities=list(g.objects(None,imported_from))
    to_add=[shape for shape in to_import_entities if shape not in imported_entities]
    #apply ilter on labels
    if filters:
        to_add=filter_entities(to_add,filters,from_graph)
    i=0
    for shape in to_add:
        label=None
        definition=None
        #skip shape class
        if str(shape)==str(from_graph_superclass):
            g.add((g_superclass,imported_from,from_graph_superclass))
            continue
        else:
            i+=1
            for s,p, o in from_graph.triples((shape,None,None)):
                #print(s,p,o)
                if p==RDFS.label:
                    label=str(o)
                if p==OBO.IAO_0000115:
                    definition=o
            if label:
                short_iri=upper_camel_case(strip_special_chars(label))
                iri=URIRef(PMDCO+short_iri)
                g.add((iri,RDF.type,OWL.Class))
                g.add((iri,imported_from,shape))
                g.add((iri,RDFS.label,Literal(label,lang='en')))
                if definition:
                    g.add((iri,SKOS.definition,definition))
                g.add((iri,OBO.IAO_0000117,Literal("PERSON: " + editor )))
    logging.info("added {} entities from entity {} as subclasses to {}".format(i,from_graph_superclass,g_superclass))
    return g

# copys subclass relations from equivalentClasses of Pato
def copy_subclass_relations(g: Graph):
    i=0
    for s,p, o in g.triples((None,imported_from,None)):
        if isinstance(o,URIRef):
            subclassof=list(pato.objects(o,RDFS.subClassOf))
            for item in subclassof:
                pmd_class = g.value(predicate=imported_from, object=item, any=False)
                if pmd_class:
                    #print(s,RDFS.subClassOf,pmd_class)
                    g.add((s,RDFS.subClassOf,pmd_class))
                    i+=1
    logging.info("added {} subclass relations from equivalent pato entities".format(i))
    return g

chatclient = AzureOpenAI(
    api_key=os.environ.get("AOAI_API_KEY"),
    # https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning
    api_version=os.environ.get("AOAI_API_VERSION"),
    # https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
    azure_endpoint= os.environ.get("AOAI_API_BASE",""),
)

def translate_label_completion(text: str, target_language: str="de", definition: str=""): 
    prompt=f"Translate the following label into {target_language}: {text}, stick to the number of words and sequence of word classes if possible.\n"
    if definition:
        prompt+=f"Use the following definition as help: {definition}\n"
    response = chatclient.completions.create( 
    model="TEXTDAVINCI3",
    prompt=prompt,
    max_tokens=60, 
    n=1, 
    stop=None, 
    temperature=0.7, ) 
    return strip_special_chars(response.choices[0].text.strip())

def translate_labels_gpt(g: Graph, language: str='de'):
    res=dict()
    labels_definitions=g.query(all_labels_definitions)    
    for thing, label, definition in labels_definitions:
        if thing not in res.keys():
            res[thing]={}
        res[thing][label.language]=label
        res[thing]["definition"]=definition
    i=0
    for thing, fields in res.items():
        if not all(lang in fields.keys() for lang in ("en","de")):
            if fields.get('en',None):
                definition=fields.get('definition','')
                label_de=Literal(translate_label_completion(fields['en'],target_language=language,definition=definition),lang=language)
                logging.info('adding german label {} for entity {}'.format(label_de, thing))
                g.add((thing,RDFS.label,label_de))
                #add curation status - requires discussion
                g.add((thing,OBO.IAO_0000114,OBO.IAO_0000428))
                i+=1
    logging.info("translated {} labels to [{}]".format(i,language))
    return g

def translate_labels_google(g: Graph, language: str='de'):
    translator=GoogleTranslator(source='auto', target=language)
    res=dict()
    labels=g.query(all_labels)    
    for thing, label in labels:
        if thing not in res.keys():
            res[thing]={}
        res[thing][label.language]=label
    i=0
    for thing, labels in res.items():
        if not all(lang in labels.keys() for lang in ("en","de")):
            #entitys with only one label
            #print(thing,labels)
            #translate and add triple
            if labels.get('en',None):
                label_de=Literal(translator.translate(labels['en']),lang=language)
                logging.info('adding german label {} for entity {}'.format(label_de, thing))
                g.add((thing,RDFS.label,label_de))
                #add curation status - requires discussion
                g.add((thing,OBO.IAO_0000114,OBO.IAO_0000428))
                i+=1
    logging.info("translated {} labels to [{}]".format(i,language))
    return g

def entitle_all_labels(g: Graph):
    res=dict()
    labels=g.query(all_labels)    
    for thing, label in labels:
        if thing not in res.keys():
            res[thing]={}
        res[thing][label.language]=label
    i=0

    for thing, labels in res.items():
        for lang, label in labels.items():
            entitled_label=str(label).title()
            if not str(label)==entitled_label:
                logging.debug('replacing label [{}] with [{}] on {}'.format(str(label), entitled_label, thing))
                #remove old label
                g.remove((thing,RDFS.label,label))
                #add capitalized one
                g.add((thing,RDFS.label,Literal(entitled_label,lang=getattr(label,'language',""))))
                #add curation status - requires discussion
                g.add((thing,OBO.IAO_0000114,OBO.IAO_0000428))
                i+=1
    logging.info("entiteled {} labels".format(i))
    return g


def copy_label_definitions_from_external_terms(source: Graph, target: Graph):
    all_entities=list(source.subjects(unique=True))
    all_entities.extend(source.predicates(unique=True))
    subjects_with_labels={thing: label for thing, label in source.query(all_labels)}.keys()
    subjects_without_labels=[subject for subject in all_entities if subject not in subjects_with_labels and isinstance(subject,URIRef)]
    logger.info("found the following subjects without rdfs:label: {}".format(subjects_without_labels))
    logger.info("graph has {} entities without rdfs:label".format(len(subjects_without_labels)))
    target_subjects_with_labels={thing: label for thing, label in target.query(all_labels)}.keys()
    found_in_target=[subject for subject in subjects_without_labels if subject in target_subjects_with_labels]
    logger.debug("found the following subjects in target:\n{}".format(found_in_target))
    i=0
    for subject in found_in_target:
        for label in target.objects(subject,RDFS.label):
            source.add((subject,RDFS.label,label))
            i+=1
    logger.info("added {} labels for {} subjects found in target".format(i,len(found_in_target)))
    return source


In [2]:
pato=parse_graph(pato_url)
iao=parse_graph(iao_url)

In [9]:
#filename="curated_pmdco-qualities.rdf"
#this_ontology_url=path2url(filename)

print(this_ontology_url)
onto=Graph()
onto=parse_graph(this_ontology_url,graph=onto)
onto=add_ontology_header(onto)
# adding rdfs:label for increased readability in editor
onto=copy_label_definitions_from_external_terms(onto,iao)

default_filter=['increased', 'decreased', 'normal']
pato_map=[
    (onto.value(predicate=RDFS.label,object=Literal("Color", lang="en")),OBO.PATO_0000014,default_filter),
    (onto.value(predicate=RDFS.label,object=Literal("Shape", lang="en")),OBO.PATO_0000052,default_filter),
    (onto.value(predicate=RDFS.label,object=Literal("Size", lang="en")),OBO.PATO_0000117,["uniform", 'increased', 'decreased', 'normal','irregular']),
    (onto.value(predicate=RDFS.label,object=Literal("Structure", lang="en")),OBO.PATO_0000141,['accumulation','acinar','apoptotic','autogenous', 'fibrinoid', 'neoplastic','with','attachment','complete','water','increased', 'decreased', 'normal', 'maximal', 'from', 'to']),
    (onto.value(predicate=RDFS.label,object=Literal("Texture", lang="en")),OBO.PATO_0000150,default_filter),
    (onto.value(predicate=RDFS.label,object=Literal("Spatial Pattern", lang="en")),OBO.PATO_0000060,default_filter),
    #(onto.value(predicate=RDFS.label,object=Literal("Molecular Quality", lang="en")),OBO.PATO_0002182,["affinity", "concentration", "osmolality"," osmolarity"]),
    (onto.value(predicate=RDFS.label,object=Literal("Odor", lang="en")),OBO.PATO_0000058,default_filter),
    (onto.value(predicate=RDFS.label,object=Literal("Organismal Quality", lang="en")),OBO.PATO_0001995,["resistance", "response", "sensitivity", 'increased', 'decreased', 'normal']),
    (onto.value(predicate=RDFS.label,object=Literal("Cellular Quality", lang="en")),OBO.PATO_0001396,default_filter),
    ]
for target, source, filters in pato_map:
    if target and source:
        onto=import_entities(g=onto,g_superclass=target,from_graph=pato,from_graph_superclass=source,filters=filters)


onto=copy_subclass_relations(onto)
#onto=translate_labels_google(onto,language='de')
onto=translate_labels_gpt(onto,language='de')

onto=entitle_all_labels(onto)

onto.serialize(output_filename,format=output_format)

file:///C:/Users/hanke/projects/core-ontology/modules/pmdco-qualities.ttl


INFO:root:found the following subjects without rdfs:label: [rdflib.term.URIRef('https://w3id.org/pmd/co'), rdflib.term.URIRef('https://w3id.org/pmd/co/toValue'), rdflib.term.URIRef('https://w3id.org/pmd/co/relatesTo'), rdflib.term.URIRef('https://w3id.org/pmd/co/characteristicOf'), rdflib.term.URIRef('https://w3id.org/pmd/co/fromValue'), rdflib.term.URIRef('http://purl.org/dc/terms/bibliographicCitation'), rdflib.term.URIRef('http://www.w3.org/ns/prov#constraints'), rdflib.term.URIRef('http://purl.org/dc/terms/created'), rdflib.term.URIRef('http://purl.org/dc/terms/title'), rdflib.term.URIRef('https://w3id.org/pmd/co/symbol'), rdflib.term.URIRef('http://www.w3.org/ns/prov#editorialNote'), rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'), rdflib.term.URIRef('http://www.w3.org/2000/01/rdf-schema#subClassOf'), rdflib.term.URIRef('http://www.w3.org/2000/01/rdf-schema#label'), rdflib.term.URIRef('http://www.w3.org/2002/07/owl#equivalentClass'), rdflib.term.URIRef('http:

<Graph identifier=Ndde263ba2716461796c24de15ac708fe (<class 'rdflib.graph.Graph'>)>