# Parsing a SPARQL query to extract basic graph patterns

Function to parse a SPARQL query and extract the BGP it contains to load them in a RDFLib graph, with support for federated queries

In [34]:
from rdflib import Namespace, URIRef, Variable, ConjunctiveGraph
from rdflib.plugins.sparql.parser import parseQuery
from rdflib.plugins.sparql.algebra import translateQuery
from rdflib.plugins.sparql.sparql import Query

up = Namespace("http://purl.uniprot.org/core/")
rh = Namespace("http://rdf.rhea-db.org/")
sqc = Namespace("http://example.org/sqc/") # SPARQL query check

# Some interesting links:
# https://rdflib.readthedocs.io/en/stable/_modules/rdflib/plugins/sparql/evaluate.html
# https://github.com/RDFLib/rdflib/blob/main/rdflib/plugins/sparql/evaluate.py
# https://github.com/vemonet/rdflib-endpoint/blob/main/example/app/custom_eval.py
def sparql_bgp_to_rdflib_graph(sparql_query: str, sparql_endpoint: str) -> ConjunctiveGraph:
    parsed_query = parseQuery(sparql_query)
    translated_query: Query = translateQuery(parsed_query)

    g = ConjunctiveGraph()
    g.bind("up", up)
    g.bind("rh", rh)
    g.bind("sqc", sqc)

    # Recursively check all parts of the query to find BGPs
    def process_part(part, endpoint: str):
        # print(part)
        if isinstance(part, list):
            for sub_pattern in part:
                process_part(sub_pattern, endpoint)
        if hasattr(part, "name"):
            print(part.name)
            if part.name == "BGP" or part.name == "TriplesBlock":
                print(part.triples)
                for triple in part.triples:
                    # TODO: handle when paths are provided? up:annotation/up:toast
                    # Maybe reuse the code from evaluate?
                    subj = triple[0]
                    pred = triple[1]
                    obj = triple[2]
                    # Replace variables with resources from the sqc namespace
                    subj = sqc[str(subj)] if isinstance(subj, Variable) else subj
                    pred = sqc[str(pred)] if isinstance(pred, Variable) else pred
                    obj = sqc[str(obj)] if isinstance(obj, Variable) else obj
                    g.add((subj, pred, obj, URIRef(endpoint)))


        if hasattr(part, "p"):
            process_part(part.p, endpoint)
        if hasattr(part, "p1"):
            process_part(part.p1, endpoint)
        if hasattr(part, "p2"):
            process_part(part.p2, endpoint)

        # Meeting a SERVICE clause
        # (can't be found in RDFLib eval because it's a special case, and they use the service_string directly with a regex)
        if hasattr(part, "graph") and hasattr(part, "service_string") and hasattr(part, "term"):
            process_part(part.graph, str(part.term))
        if hasattr(part, "where"):
            process_part(part.where, endpoint)
        if hasattr(part, "part"):
            process_part(part.part, endpoint)


    def extract_basic_graph_pattern(algebra):
        print(algebra)
        if hasattr(algebra, "p"):
            process_part(algebra.p, sparql_endpoint)

    extract_basic_graph_pattern(translated_query.algebra)
    return g

# Example SPARQL query
sparql_query = """
PREFIX up: <http://purl.uniprot.org/core/>
PREFIX rh: <http://rdf.rhea-db.org/>
SELECT ?uniprotCount ?rhea ?accession ?equation
WHERE {
  SERVICE <https://sparql.uniprot.org/sparql> {
      SELECT ?rhea (count(?uniprot) as ?uniprotCount) {
          ?uniprot up:annotation ?rhea .
          ?uniprot up:toast ?toooast .
      }
      GROUP BY ?rhea
  }
  ?rhea rh:accession ?accession .
  ?rhea rh:equation ?equation .
}
"""

graph = sparql_bgp_to_rdflib_graph(sparql_query, "https://sparql.rhea-db.org/sparql/")
print(graph.serialize(format="trig"))


SelectQuery_SelectQuery_{'p': Project_{'p': Join_{'p1': ServiceGraphPattern_{'service_string': 'SERVICE <https://sparql.uniprot.org/sparql> {\n      SELECT ?rhea (count(?uniprot) as ?uniprotCount) {\n          ?uniprot up:annotation ?rhea .\n          ?uniprot up:toast ?toooast .\n      }\n      GROUP BY ?rhea\n  }', 'term': rdflib.term.URIRef('https://sparql.uniprot.org/sparql'), 'graph': SubSelect_{'projection': [vars_{'var': rdflib.term.Variable('rhea'), '_vars': {rdflib.term.Variable('rhea')}}, vars_{'expr': Aggregate_Count_{'distinct': [], 'vars': rdflib.term.Variable('uniprot'), '_vars': {rdflib.term.Variable('uniprot')}}, 'evar': rdflib.term.Variable('uniprotCount'), '_vars': {rdflib.term.Variable('uniprotCount'), rdflib.term.Variable('uniprot')}}], 'where': GroupGraphPatternSub_{'part': [TriplesBlock_{'triples': [[rdflib.term.Variable('uniprot'), rdflib.term.URIRef('http://purl.uniprot.org/core/annotation'), rdflib.term.Variable('rhea')], [rdflib.term.Variable('uniprot'), rdfli