In [None]:
%pip install -r requirements.txt

In [None]:
import os
import json
from dotenv import load_dotenv 

load_dotenv("app.properties")     

In [None]:

"""
============================================================================

Build the graph from geojson feature data

============================================================================
"""
import fiona
from shapely import within, overlaps, dwithin
from shapely.geometry import shape
from rdflib import Dataset, URIRef, Literal, Namespace
from rdflib.namespace import RDF, RDFS, XSD

"""
Define the in-memory graph
"""
# Define a namespace
NAMESPACE = Namespace("https://terraframe.com/example/graph")

# Create a QUAD dataset
dataset = Dataset()
# Add a triple graph to the dataset
graph = dataset.graph(URIRef(NAMESPACE + "#"))


# Create metadata about the types we want to define in the graph including: 
#   Name of type
#   The location of the data
#   Any extra feature properties we want to create triples for
channelType = {'type':'ChannelReach',"location": "data/geojson/ChannelReach.geojson", 'props':[]}
leveeType = {'type':'LeveeArea',"location": "data/geojson/LeveeArea.geojson", 'props':[]}
areaType = {'type':'LeveedArea',"location": "data/geojson/LeveedArea.geojson", 'props':[]}
tractType = {'type':'CensusTract',"location": "data/geojson/CensusTract.geojson", 'props':[{'name': 'population', 'target':"CensusTract-population", 'type': 'integer'}]}
zoneType = {'type':'SchoolZone',"location": "data/geojson/SchoolZone.geojson", 'props':[]}
schoolType = {'type':'School', "location": "data/geojson/School.geojson", 'props':[{'name': 'population', 'target':"School-population", 'type': 'integer'}]}

# List of type metadata to process
nodes = [channelType, leveeType, areaType, tractType, zoneType, schoolType]

# Define metadata about the edge types we want to define including:
#   Name of edge
#   Function to use to determine if an edge should be create between two features
#   List of source feature types to use
#   List of target feature types to use
edges = [
    {'type': 'HasFloodRisk', 'function': 'WITHIN', 'sources': [areaType], 'targets': [schoolType]},
    {'type': 'HasFloodZone', 'function': 'OVERLAPS', 'sources': [leveeType], 'targets': [areaType]},    
    {'type': 'HasSchoolZone', 'function': 'WITHIN', 'sources': [zoneType], 'targets': [schoolType]},        
    {'type': 'ChannelHasLevee', 'function': 'DISTANCE', 'sources': [channelType], 'targets': [leveeType]},        
    {'type': 'TractAtRisk', 'function': 'OVERLAPS', 'sources': [tractType], 'targets': [areaType]},            
]

# Create the attribute triples and add them to the graph
for nodeType in nodes:
    
    # Load the features into memory
    features = fiona.open(nodeType['location'], "r")
    
    for feature in features:    
        
        # Define the URI for the feature and type
        id = URIRef(NAMESPACE + "#" + nodeType['type'] + '-' + feature['properties']['code'])        
        type = URIRef(NAMESPACE + "#" + nodeType['type'])
        
        # Create the code attribute  triple for the feature
        graph.add((id, URIRef(NAMESPACE + "#code"), Literal(feature['properties']['code'])))

        # Create the label attribute triple for the feature        
        graph.add((id, RDFS.label, Literal(feature['properties']['label'])))
        
        # Create the type triple for the feature                
        graph.add((id, RDF.type, type))
        
        #
        # Process the extra feature properties defined in the
        # type metadata and create attribute triples for them
        #
        for prop in nodeType['props']:
            name = URIRef(NAMESPACE + "#" + prop['target'])
            
            value = str(feature['properties'][prop['name']])
            datatype = XSD.int if prop['type'] == 'integer' else XSD.string   
            
            graph.add((id, name, Literal(value, datatype=datatype)))
            
    features.close()

"""
============================================================================

Compute the edges and add them as triples

============================================================================
"""
for edge in edges:
    
    # Define the URI of the edge type
    type = URIRef(NAMESPACE + "#" + edge['type'])
    
    for sourceType in edge['sources']:
        sourceFeatures = fiona.open(sourceType['location'], "r")
        
        for targetType in edge['targets']:
            targetFeatures = fiona.open(targetType['location'], "r")
            
            # Loop through all of the source features
            for sourceFeature in sourceFeatures:    
                
                # Convert the geometry to a Shapely geometry
                sourceGeom = shape(sourceFeature['geometry'])  
                
                # Define the source feature URI
                sourceId = URIRef(NAMESPACE + "#" + sourceType['type'] + '-' + sourceFeature['properties']['code'])
                
                # Loop through all of the target features
                for targetFeature in targetFeatures:
                    # Convert the target feature to a geometry
                    targetGeom = shape(targetFeature['geometry'])
                    
                    # Define the target feature URI
                    targetId = URIRef(NAMESPACE + "#" + targetType['type'] + '-' + targetFeature['properties']['code'])
                    
                    # 
                    # Based on the edge metadata function determine if an edge should
                    # be created between the two features
                    #
                    if edge['function'] == 'WITHIN' and within(targetGeom, sourceGeom):
                        graph.add((sourceId, type, targetId))                       
                    elif edge['function'] == 'OVERLAPS' and overlaps(targetGeom, sourceGeom):
                        graph.add((sourceId, type, targetId))                                               
                    elif edge['function'] == 'DISTANCE' and dwithin(targetGeom, sourceGeom, 100):
                        graph.add((sourceId, type, targetId))                       
        
            targetFeatures.close()        
        
        sourceFeatures.close()
        
""" 
Hard-coded edges generated from outside analytics tool
"""
type = URIRef(NAMESPACE + "#FlowsInto")

graph.add(( URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_28"  ), type, URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_27"  )))                       
graph.add(( URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_27"  ), type, URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_26"  )))                       
graph.add(( URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_26"  ), type, URIRef(NAMESPACE + "#ChannelReach-CEMVK_RR_03_ONE_25"  )))                       

""" 
Export the graph to a file
"""
print(dataset.serialize(format="trig"), file=open('data/export.trig', 'w'))

In [None]:

# JSON specific for the tool to be provided to Bedrock
def get_tool_spec():
    """
    Returns the JSON Schema specification for the SPARQL tool. The tool specification
    defines the input schema and describes the tool's functionality.
    For more information, see https://json-schema.org/understanding-json-schema/reference.

    :return: The tool specification for the SPARQL tool.
    """
    return {
        "toolSpec": {
            "name": "Sparql_Tool",
            "description": "Get a result set from a triple store, based on a SPARQL select statement.",
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "statement": {
                            "type": "string",
                            "description": "SPARQL select statement.",
                        },
                    },
                    "required": ["statement"],
                }
            },
        }
    }

# Implementation of the tool which executes SPARQL on the in-memory dataset
def exectue_sparql(input_data):
    """
    Executes a SPARQL statement from in an memory graphstore.
    Returns a JSON array of the results.

    :param input_data: The input data containing the SPARQL select statement.
    :return: The .
    """
    statement = input_data.get("statement")

    try:
        qres = dataset.query(statement)
    
        rows = []
        for row in qres:
            data = {}
        
            for var in qres.vars:
                data[str(var)] = str(row.get(var, ""))
            
            rows.append(data)
        
        return rows

    except Exception as e:
        return {"error": type(e), "message": str(e)}



In [None]:
# Test the SPARQL tool
statement = """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>	
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>

SELECT (SUM(?population) as ?totalPopulation)	
FROM <https://terraframe.com/example/graph#>
WHERE {
  ?censusTract obj:CensusTract-population ?population .	
} 
"""

input_data = {'statement': statement}

result = exectue_sparql(input_data)

json.dumps(result)

In [None]:

"""
This demo illustrates a tool use scenario using Amazon Bedrock's Converse API and a tool to query SPRAQL.
The script interacts with a foundation model on Amazon Bedrock and an in memory graph to provide location
information based on user input.
"""

import boto3
import logging
from enum import Enum

from classes import tool_use_print_utils as output

logging.basicConfig(level=logging.INFO, format="%(message)s")



# For the most recent list of models supported by the Converse API's tool use functionality, visit:
# https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference.html
class SupportedModels(Enum):
    CLAUDE_SONNET = "anthropic.claude-3-5-sonnet-20241022-v2:0",
    CLAUDE_SONNET_3_7 =  "us.anthropic.claude-3-7-sonnet-20250219-v1:0"


# Set the model ID, e.g., Claude 3 Sonnet 3.7.
MODEL_ID = SupportedModels.CLAUDE_SONNET_3_7.value

# Define the system prompt with the instructions for the agent
SYSTEM_PROMPT = """
You are a location analysis assistant that provides location information for user-specified locations using only
the Sparql_Tool, which expects a SPARQL select statement. The user is going to ask about a location.  

Based on the rules generate a SPARQL SELECT statement for the location. Limit the SPARQL result set to a max of 100. 
Finally, use the Sparql_Tool to execute the SPARQL SELECT statement and return the results.  
If the respons starts with "No data found" then tell the user that you were unable to find any results for that question and to ask a different question and STOP.

Instructions:
- Use only the node types and properties provided in the schema.
- Do not use any node types and properties that are not explicitly provided.
- Include all necessary prefixes.
- Given any relationship property, you should just use them following the relationship paths provided, respecting the direction of the relationship path.

The query should follow the following guidance:
- Always generate the from clause using graph <https://terraframe.com/example/graph#>.
```
// Incorrect
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>
PREFIX geo: <http://www.opengis.net/ont/geosparql#>

SELECT DISTINCT ?schoolZoneName
WHERE {
    ?leveeArea obj:GeoObject-code "LEV_A_229" .
    ?leveeArea obj:HasFloodZone ?leveedArea .
    ?leveedArea obj:HasFloodRisk ?school .
    ?schoolZone obj:HasSchoolZone ?school .
    ?schoolZone rdfs:label ?schoolZoneName .

// Correct
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>
PREFIX geo: <http://www.opengis.net/ont/geosparql#>

SELECT DISTINCT ?schoolZoneName
FROM <https://terraframe.com/example/graph#>
WHERE {
    ?leveeArea obj:GeoObject-code "LEV_A_229" .
    ?leveeArea obj:HasFloodZone ?leveedArea .
    ?leveedArea obj:HasFloodRisk ?school .
    ?schoolZone obj:HasSchoolZone ?school .
    ?schoolZone rdfs:label ?schoolZoneName .
}```

- Self referencing should always generate an edge with *
```
// Incorrect
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>
PREFIX geo: <http://www.opengis.net/ont/geosparql#>

SELECT DISTINCT ?tractCode ?tractLabel
FROM <https://terraframe.com/example/graph#>
WHERE {     
  ?parent rdf:type obj:ChannelReach ;
  obj:GeoObject-code "CEMVK_RR_03_ONE_27" .     
  ?parent obj:FlowsInto ?channel .     
  ?channel obj:ChannelHasLevee ?leveeArea .     
  ?leveeArea obj:GeoObject-code ?leveeAreaCode .     
} 

// Correct
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>
PREFIX geo: <http://www.opengis.net/ont/geosparql#>

SELECT DISTINCT ?tractCode ?tractLabel
FROM <https://terraframe.com/example/graph#>
WHERE {     
  ?parent rdf:type obj:ChannelReach ;
  obj:GeoObject-code "CEMVK_RR_03_ONE_27" .     
  ?parent obj:FlowsInto* ?channel .     
  ?channel obj:ChannelHasLevee ?leveeArea .     
  ?leveeArea obj:GeoObject-code ?leveeAreaCode .     
}```
- Aggregation functions must always be wrapped in parenthesis with its variable name
```
// Incorrect
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>

SELECT SUM(?population) as ?totalPopulation
FROM <https://terraframe.com/example/graph#>
WHERE {
  ?censusTract obj:GeoObject-code "CEMVK_RR_03_ONE_27" .
  ?censusTract obj:CensusTract-population ?population .  
} 

// Correct
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>

SELECT (SUM(?population) as ?totalPopulation) 
FROM <https://terraframe.com/example/graph#>
WHERE {
  SELECT DISTINCT ?censusTract ?population
  WHERE {      
    ?censusTract obj:GeoObject-code "CEMVK_RR_03_ONE_27" .
    ?censusTract obj:CensusTract-population ?population .  
  }
} ```
- Aggregation functions must always restrict values their distinct subject
```
// Incorrect
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>

SELECT (SUM(?population) as ?totalPopulation) 
FROM <https://terraframe.com/example/graph#>
WHERE {
  ?censusTract obj:GeoObject-code "CEMVK_RR_03_ONE_27" .
  ?censusTract obj:CensusTract-population ?population .  
} 

// Correct
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX obj: <https://terraframe.com/example/graph#>

SELECT (SUM(?population) as ?totalPopulation) 
FROM <https://terraframe.com/example/graph#>
WHERE {
  SELECT DISTINCT ?censusTract ?population
  WHERE {      
    ?censusTract obj:GeoObject-code "CEMVK_RR_03_ONE_27" .
    ?censusTract obj:CensusTract-population ?population .  
  }
} ```
Note: Be as concise as possible.
Do not include any explanations or apologies in your responses.
Do not include any text except the SPARQL query generated.


Schema:
In the following, each IRI is followed by the local name and optionally its description in parentheses. 
The graph supports the following node types:

<https://terraframe.com/example/graph#ChannelReach> (ChannelReach), 
<https://terraframe.com/example/graph#School> (School), 
<https://terraframe.com/example/graph#LeveeArea> (LeveeArea), 
<https://terraframe.com/example/graph#LeveedArea> (LeveedArea), 
<https://terraframe.com/example/graph#SchoolZone> (SchoolZone), 
<https://terraframe.com/example/graph#CensusTract> (CensusTract), 

The graph supports the following relationships:
<http://www.w3.org/2000/01/rdf-schema#label> (label), 
<http://www.w3.org/1999/02/22-rdf-syntax-ns#type> (type), 
<https://terraframe.com/example/graph#code> (code), 
<https://terraframe.com/example/graph#School-population> (School-population), 
<https://terraframe.com/example/graph#CensusTract-population> (CensusTract-population), 
<https://terraframe.com/example/graph#FlowsInto> (FlowsInto), 
<https://terraframe.com/example/graph#ChannelHasLevee> (ChannelHasLevee), 
<https://terraframe.com/example/graph#HasFloodZone> (HasFloodZone), 
<https://terraframe.com/example/graph#HasSchoolZone> (HasSchoolZone), 
<https://terraframe.com/example/graph#TractAtRisk> (TractAtRisk), 
<https://terraframe.com/example/graph#HasFloodRisk> (HasFloodRisk), 

The data model between types is the following:
(ChannelReach)-[FlowsInto]->(ChannelReach),
(ChannelReach)-[ChannelHasLevee]->(LeveeArea),
(LeveeArea)-[HasFloodZone]->(LeveedArea),
(CensusTract)-[TractAtRisk]->(LeveedArea),
(LeveedArea)-[HasFloodRisk]->(School),
(SchoolZone)-[HasSchoolZone]->(School)

If the user asks about population use CensusTract-population. If the user asks about the number of students use School-population.    

"""

# The maximum number of recursive calls allowed in the tool_use_demo function.
# This helps prevent infinite loops and potential performance issues.
MAX_RECURSIONS = 5


class GraphToolDemo:
    """
    Demonstrates the tool use feature with the Amazon Bedrock Converse API.
    """

    def __init__(self):
        # Prepare the system prompt
        self.system_prompt = [{"text": SYSTEM_PROMPT}]

        # Prepare the tool configuration with the sparql tool's specification
        self.tool_config = {"tools": [get_tool_spec()]}

        # Create a Bedrock Runtime client in the specified AWS Region.
        self.bedrockRuntimeClient = boto3.client(
            "bedrock-runtime", 
            region_name=os.getenv('AWS_REGION'),
            aws_access_key_id=os.getenv('AWS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_KEY')
        )

    def run(self):
        """
        Starts the conversation with the user and handles the interaction with Bedrock.
        """
        # Print the greeting and a short user guide
        output.header()

        # Start with an emtpy conversation
        conversation = []

        # Get the first user input
        user_input = self._get_user_input()

        while user_input is not None:
            # Create a new message with the user input and append it to the conversation
            message = {"role": "user", "content": [{"text": user_input}]}
            conversation.append(message)

            # Send the conversation to Amazon Bedrock
            bedrock_response = self._send_conversation_to_bedrock(conversation)

            # Recursively handle the model's response until the model has returned
            # its final response or the recursion counter has reached 0
            self._process_model_response(
                bedrock_response, conversation, max_recursion=MAX_RECURSIONS
            )

            # Repeat the loop until the user decides to exit the application
            user_input = self._get_user_input()

        output.footer()

    def _send_conversation_to_bedrock(self, conversation):
        """
        Sends the conversation, the system prompt, and the tool spec to Amazon Bedrock, and returns the response.

        :param conversation: The conversation history including the next message to send.
        :return: The response from Amazon Bedrock.
        """
        output.call_to_bedrock(conversation)

        # Send the conversation, system prompt, and tool configuration, and return the response
        return self.bedrockRuntimeClient.converse(
            modelId=MODEL_ID,
            messages=conversation,
            system=self.system_prompt,
            toolConfig=self.tool_config,
        )

    def _process_model_response(
        self, model_response, conversation, max_recursion=MAX_RECURSIONS
    ):
        """
        Processes the response received via Amazon Bedrock and performs the necessary actions
        based on the stop reason.

        :param model_response: The model's response returned via Amazon Bedrock.
        :param conversation: The conversation history.
        :param max_recursion: The maximum number of recursive calls allowed.
        """

        if max_recursion <= 0:
            # Stop the process, the number of recursive calls could indicate an infinite loop
            logging.warning(
                "Warning: Maximum number of recursions reached. Please try again."
            )
            exit(1)

        # Append the model's response to the ongoing conversation
        message = model_response["output"]["message"]
        conversation.append(message)
        
        if model_response["stopReason"] == "tool_use":
            # If the stop reason is "tool_use", forward everything to the tool use handler
            self._handle_tool_use(message, conversation, max_recursion)

        if model_response["stopReason"] == "end_turn":
            # If the stop reason is "end_turn", print the model's response text, and finish the process
            
            if(len(message["content"]) > 0) :
                output.model_response(message["content"][0]["text"])
            else:
                output.model_response("Agent returned an empty response")

            return

    def _handle_tool_use(
        self, model_response, conversation, max_recursion=MAX_RECURSIONS
    ):
        """
        Handles the tool use case by invoking the specified tool and sending the tool's response back to Bedrock.
        The tool response is appended to the conversation, and the conversation is sent back to Amazon Bedrock for further processing.

        :param model_response: The model's response containing the tool use request.
        :param conversation: The conversation history.
        :param max_recursion: The maximum number of recursive calls allowed.
        """

        # Initialize an empty list of tool results
        tool_results = []

        # The model's response can consist of multiple content blocks
        for content_block in model_response["content"]:
            if "text" in content_block:
                # If the content block contains text, print it to the console
                output.model_response(content_block["text"])

            if "toolUse" in content_block:
                # If the content block is a tool use request, forward it to the tool
                tool_response = self._invoke_tool(content_block["toolUse"])

                # Add the tool use ID and the tool's response to the list of results
                tool_results.append(
                    {
                        "toolResult": {
                            "toolUseId": (tool_response["toolUseId"]),
                            "content": [{"json": {'rows': tool_response["content"]}}],
                        }
                    }
                )

        # Embed the tool results in a new user message
        message = {"role": "user", "content": tool_results}
        
        # Append the new message to the ongoing conversation
        conversation.append(message)
        
        # Send the conversation to Amazon Bedrock
        response = self._send_conversation_to_bedrock(conversation)
        
        # Recursively handle the model's response until the model has returned
        # its final response or the recursion counter has reached 0
        self._process_model_response(response, conversation, max_recursion - 1)

    def _invoke_tool(self, payload):
        """
        Invokes the specified tool with the given payload and returns the tool's response.
        If the requested tool does not exist, an error message is returned.

        :param payload: The payload containing the tool name and input data.
        :return: The tool's response or an error message.
        """
        tool_name = payload["name"]

        if tool_name == "Sparql_Tool":
            input_data = payload["input"]
            output.tool_use(tool_name, input_data)

            # Invoke the sparql tool with the input data provided by
            response = exectue_sparql(input_data)
        else:
            error_message = (
                f"The requested tool with name '{tool_name}' does not exist."
            )
            response = {"error": "true", "message": error_message}

        return {"toolUseId": payload["toolUseId"], "content": response}

    @staticmethod
    def _get_user_input(prompt="Your location info request"):
        """
        Prompts the user for input and returns the user's response.
        Returns None if the user enters 'x' to exit.

        :param prompt: The prompt to display to the user.
        :return: The user's input or None if the user chooses to exit.
        """
        output.separator()
        user_input = input(f"{prompt} (x to exit): ")

        if user_input == "":
            prompt = "Please enter your location info request, e.g. the name of a location in the graph"
            return GraphToolDemo._get_user_input(prompt)

        elif user_input.lower() == "x":
            return None

        else:
            return user_input


if __name__ == "__main__":
    tool_use_demo = GraphToolDemo()
    tool_use_demo.run()

