## Unlock insights for Amazon Security Lake data using Generative AI leveraging Amazon Bedrock

This Jupyter Notebook demonstrates the ability to generate SQL queries with user provided natural language inputs and how that can be accomplished with the assistance of the LangChain framework. It shows how you can utilize Agents, and Tools to work with Amazon Security Lake data.

LangChain is a flexible framework that can integrate with a variety of LLMs. This Notebook was written with LangChain version 0.0.345(and langchain_experimental version: 0.0.43) using the "anthropic.claude-v2" model from Amazon Bedrock.

Also, be sure to install the requirements below:

In [None]:
!pip install -r requirements.txt --quiet

#Restart Kernel to use packages
import os
os._exit(00)

In [1]:
import langchain_experimental, langchain
import matplotlib, pandas

print("langchain.__version__: ", langchain.__version__)
print("langchain_experimental.__version__: ", langchain_experimental.__version__)

langchain.__version__:  0.1.11
langchain_experimental.__version__:  0.0.52


## Connect to Security Lake database using SQLAlchemy

In [47]:
import os
ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
REGION_NAME = os.environ.get('REGION_NAME', 'us-east-1')
REGION_FMT = REGION_NAME.replace("-","_")

In [48]:
print(REGION_NAME)

ap-southeast-2


In [60]:
from langchain import SQLDatabase
from sqlalchemy import create_engine, text

#Amazon Security Lake Database
SCHEMA_NAME = f"amazon_security_lake_glue_db_{REGION_FMT}"

#S3 Staging location for Athena query output results and this will be created by deploying the Cloud Formation stack
S3_STAGING_DIR = f's3://athena-gen-ai-bucket-results-{ACCOUNT_ID}/output/'

#AWS region where the Amazon Security lake database is created


engine_athena = create_engine(
    "awsathena+rest://@athena.{}.amazonaws.com:443/{}?s3_staging_dir={}".
    format(REGION_NAME, SCHEMA_NAME, S3_STAGING_DIR)
)

athena_db = SQLDatabase(engine_athena)
db = athena_db

In [63]:
strquery = "SELECT * FROM amazon_security_lake_table_ap_southeast_2_cloud_trail_mgmt_2_0 limit 10"
with engine_athena.connect() as connection:
    result = connection.execute(text(strquery))
    for row in result:
        print(row)

OperationalError: (pyathena.error.OperationalError) TABLE_NOT_FOUND: line 1:15: Table 'awsdatacatalog.amazon_security_lake_glue_db_ap_southeast_2.amazon_security_lake_table_ap_southeast_2_cloud_trail_mgmt_2_0' does not exist
[SQL: SELECT * FROM amazon_security_lake_table_ap_southeast_2_cloud_trail_mgmt_2_0 limit 10]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

## Define LLM and endpoint url to invoke model, we will be using claude-v2 from Anthropic available within Amazon Bedrock

Claude v2 is Anthropic's most powerful model, which excels at a wide range of tasks from sophisticated dialogue and creative content generation to detailed instruction following.
There is also another faster and cheaper model available from Anthropic which is Claude Instant v1.2.

In [33]:
from langchain_community.chat_models import BedrockChat
import os

model_id= "anthropic.claude-3-sonnet-20240229-v1:0"

llm = BedrockChat(
    model_id=model_id,
    
    # Do not neet to provide - defaults to this notebook's region -- https://api.python.langchain.com/en/latest/llms/langchain.llms.bedrock.Bedrock.html#langchain.llms.bedrock.Bedrock.region_name
    # region_name=region_name,
    endpoint_url=f"https://bedrock-runtime.{REGION_NAME}.amazonaws.com",
)

llm.model_kwargs = {'temperature':0.0,
                    'top_k':0,
                    'max_tokens': 4096}

## Provide list of tools for Agent

### Create Custom tools

Tools are interfaces that an agent can use to interact. Here we will be using SQL and Python tools to help agent determine the right action.

In [34]:
from langchain.tools.sql_database.tool import InfoSQLDatabaseTool, QuerySQLDataBaseTool

class InfoSQLDatabaseTool_custom(InfoSQLDatabaseTool):
    name= "sql_db_schema_and_sample_rows"
    description= '\n    Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables.    \n\n    Example Input: "table1, table2, table3"\n    '
    
    def _run(self, tables: list[str]) -> str:
        list_tables= tables.replace(' ', '').split(',')
        
        schema_str= ''
        for table in list_tables:
            schema_rows_str= super()._run(table)
            row_str= schema_rows_str[schema_rows_str.find('/*'):]   
            
            schema_rows= QuerySQLDataBaseTool(db=db)._run(f"SHOW CREATE TABLE `{table}`")[3:-4]
            schema_rows_formatted=schema_rows.replace("',), ('  ", '\n').replace("',), ('", '\n').replace("',), (\"  ", '\n').replace(', ",), ("  ', '\n')
            
            schema_str+= row_str + '\n' + schema_rows_formatted + '\n\n'
        
        return schema_str.strip()

In [35]:
from langchain.tools.sql_database.tool import QuerySQLDataBaseTool
QuerySQLDataBaseTool_desc= '\n    Input to this tool is a detailed and correct SQL query, output is a result from the database.\n    This tool gives access to a real databse.\n    If the query does not return anything or return blank results, it means the query is correct and returned 0 rows.\n    If the query is not correct, an error message will be returned.\n    If an error is returned, re-examine the database using the `sql_db_schema_and_sample_rows` tool, rewrite the query, check the query, and try again.\n    '
import time

class QuerySQLDatabaseTool_custom(QuerySQLDataBaseTool):
    name= "sql_db_query"
    description= QuerySQLDataBaseTool_desc
        
    def _run(self, query: str) -> str:
        print()
        print('*'*10)            
        print("Query passed to sql_db_query tool by llm: \n", query)
        print('*'*10)
        print()
        
        return super()._run(query.strip())

### Initialize tools and create a list

In [36]:
from langchain.tools.sql_database.tool import ListSQLDatabaseTool, QuerySQLCheckerTool
from langchain_experimental.tools import PythonREPLTool

tools = [
QuerySQLDatabaseTool_custom(db=db, description= QuerySQLDataBaseTool_desc),
ListSQLDatabaseTool(db=db),
PythonREPLTool(),
InfoSQLDatabaseTool_custom(db=db),
]


## Custom output parser

Use this to ensure Claude via Bedrock replies to be consistent with the agent

In [37]:
claude_instructions_for_agent = """To use a tool, please use the following format:\n\nThought: Do I need to use a tool? Yes\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n\nWhen you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:\n\n{ai_prefix}:[your response here]"""
print(claude_instructions_for_agent)

To use a tool, please use the following format:

Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action

When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:

{ai_prefix}:[your response here]


In [38]:
from langchain.memory import ConversationBufferMemory
from langchain.agents import initialize_agent,AgentType,AgentOutputParser
from langchain.schema import AgentAction,AgentFinish
from langchain.memory import ConversationBufferMemory
from typing import Union
import re

class CustomConvoOutputParser(AgentOutputParser):
    """Output parser for the conversational agent."""

    ai_prefix: str = "AI"
    """Prefix to use before AI output."""

    def get_format_instructions(self) -> str:
        return claude_instructions_for_agent

    def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
        regex = r"Action: (.*?)[\n]*Action Input:[\s+]*([\S\s]*)"
        match = re.search(regex, text)
        if not match:
            return AgentFinish(
                {"output": text.split(f"{self.ai_prefix}:")[-1].strip()}, text
            )
        action = match.group(1)
        action_input = match.group(2)
        return AgentAction(action.strip().replace('\n', ' '), action_input.replace('\n', ' ').strip(" ").strip('"'), text)

    @property
    def _type(self) -> str:
        return "conversational"


## Adding Conversation Buffer Memory

You can also load messages into a BufferMemory instance by creating and passing in a ChatHistory object. This lets you easily pick up state from past conversations.

In [39]:
from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory(memory_key="chat_history")

## Initialize the Agent

Agents use an LLM to determine which actions to take and in what order. An action can either be using a tool and observing its output, or returning to the user.

In [40]:
from langchain.agents import initialize_agent

conversational_agent = initialize_agent(
    agent="conversational-react-description",
    tools=tools,
    llm= llm,
    verbose=True,  # Show its work. Set this to False if you're only interested in the final output
    # return_direct=True,  # Return the results without sending back to the LLM. False by default
    max_iterations=None,
    memory=memory,
    handle_parsing_errors=False,
    agent_kwargs={'format_instructions':claude_instructions_for_agent,'output_parser':CustomConvoOutputParser()}
)

## Provide instructions to the Agent on how to use Tools.

In [41]:
conversational_agent.agent.llm_chain.prompt.template= conversational_agent.agent.llm_chain.prompt.template[conversational_agent.agent.llm_chain.prompt.template.find("TOOLS:\n------"):]

In [42]:
phrase= "To use a tool"

index= conversational_agent.agent.llm_chain.prompt.template.find(phrase)

primer= "\nINSTRUCTIONS:\n-------------\n\n"

conversational_agent.agent.llm_chain.prompt.template= conversational_agent.agent.llm_chain.prompt.template[:index] + primer + conversational_agent.agent.llm_chain.prompt.template[index:]


In [43]:
phrase= "Begin!"

primer= '''\n\nFor the questions being asked, ALWAYS use all the tools in a sequence defined in the <sequence> tags without skipping tools to generate an answer\n\n<sequence> sql_db_list_tables -> sql_db_schema_and_sample_rows -> sql_db_query </sequence>\n\n-ALWAYS generate a SQL Query after examining the database using the `sql_db_schema_and_sample_rows` tool.\n-Execute the SQL Query using the `sql_db_query` tool.\n-NEVER generate results without querying the database using the `sql_db_query` tool. Execute all steps using tools, without pausing for input from user.\n-ALWAYS generate an answer after examining `Observation` from tool's response.\n\nPay attention to SQL Queries generated.\n- Do not use colon `:` in the SQL Query. It causes this error "Error: (sqlalchemy.exc.InvalidRequestError) A value is required for bind parameter".\n- Avoid using aliases(`as` clause) in the SQL Query.\n- When querying column of `string` type, use single quotes ' in SQL Query for casting to string.\n- ALWAYS use the GROUP BY clause for columns you want to query.\n- Don't use table JOIN, unless you absolutely have to.\n- Do not use backtick ` in the SQL Query.\n\nONLY when asked to generate figure/charts/plots for results, generate code to show the figure/charts/plots. Then execute the generated code using the Python_REPL tool to make sure the code successfully generates the figure.\n\n\n'''

index= conversational_agent.agent.llm_chain.prompt.template.find(phrase)

conversational_agent.agent.llm_chain.prompt.template= conversational_agent.agent.llm_chain.prompt.template[:index] + primer + conversational_agent.agent.llm_chain.prompt.template[index:]



In [44]:
print(conversational_agent.agent.llm_chain.prompt.template)

TOOLS:
------

Assistant has access to the following tools:

> sql_db_query: 
    Input to this tool is a detailed and correct SQL query, output is a result from the database.
    This tool gives access to a real databse.
    If the query does not return anything or return blank results, it means the query is correct and returned 0 rows.
    If the query is not correct, an error message will be returned.
    If an error is returned, re-examine the database using the `sql_db_schema_and_sample_rows` tool, rewrite the query, check the query, and try again.
    
> sql_db_list_tables: Input is an empty string, output is a comma-separated list of tables in the database.
> Python_REPL: A Python shell. Use this to execute python commands. Input should be a valid python command. If you want to see the output of a value, you should print it out with `print(...)`.
> sql_db_schema_and_sample_rows: 
    Input to this tool is a comma-separated list of tables, output is the schema and sample rows for

In [45]:
# ignore alternative between human and assistant warnings from Claude
import warnings
warnings.filterwarnings('ignore')

## A Security Threat Hunter's converation with the agent

Provide the question in the input dialog box and hit enter. To break from the loop you can type exit and hit enter. Use up and down arrows on your keyboard to view previous questions.

In [46]:
while True:
    user_input = input("")
    if user_input=='exit': break
    print(conversational_agent.run(user_input))
    print()
    print('-'*100)

 What datasources are avaialble in Amazon Security Lake




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find out what datasources are available in Amazon Security Lake, I first need to get a list of tables in the database.
Action: sql_db_list_tables
Action Input: 
[0m
Observation: [33;1m[1;3m[0m
Thought:[32;1m[1;3mThought: To find the datasources available in Amazon Security Lake, I first need to get the list of tables in the database and examine their schemas.
Action: sql_db_list_tables
Action Input: 
[0m
Observation: [33;1m[1;3m[0m
Thought:[32;1m[1;3mThought: To find the datasources available in Amazon Security Lake, I first need to get the list of tables in the database and examine their schemas.
Action: sql_db_list_tables
Action Input: 
[0m
Observation: [33;1m[1;3m[0m
Thought:[32;1m[1;3mThought: To find the datasources available in Amazon Security Lake, I first need to get the list of tables in the database and examine their schemas.
Action: sql_db_list_tables
Action Input: 
[0m
Observation: 

 exit


## Concluding thoughts

The example use case and run in this Notebook are a product of several prompt instruction trials to combat errors we encountered in using this tool.

The agent can sporidically generate fabricated answers without using tools, prompt the agent to use tools. (ex: "use tools to answer the questions"). Clearing memory might help further (cell: Adding Conversation Buffer Memory)

The PythonREPL tool is currently being utilized to generate code only, and separately run in a cell to show/save plots.

Try different models hosted in bedrock!