
# Natural Language Querying of data in S3 with Athena and Generative AI (Text-to-SQL)


In this sample notebook, you'll see how generative AI can simplify the process of querying and analyzing data stored in Amazon S3 using AWS Athena and the Glue Catalog. Instead of manually writing complex SQL queries, we'll showcase how to describe your analysis requirements in plain English text, and leverage a Generative AI model to generate the corresponding Athena SQL queries automatically.

Athena is an interactive query service that enables analysts to analyze data in S3 using standard SQL. However, constructing SQL queries, especially for complex analysis requirements, can be challenging. This is where the Glue Catalog can help - it stores table definitions and schemas for your data in S3, allowing Athena to query that data seamlessly.

This notebook illustrates how introducing generative AI can bridge the gap. 

1. Overview of text-to-SQL capabilities using GenAI models
2. Utilizing the Glue Catalog table definitions
3. Generating and executing Athena SQL queries from natural language descriptions

## Prerequisites

You will need the following to execute the code in this notebook:

- Access to Athena (for query execution), S3 (read access), Glue Catalog (For Database and Tables) and Bedrock (LLMs).
- Claude 3 Sonnet Model Enabled on Bedrock. You can read more about model access here: https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html
- An S3 bucket to store the data files

### Installing the required libraries

In [1]:
pip install awswrangler pandas boto3

Collecting awswrangler
  Downloading awswrangler-3.7.3-py3-none-any.whl.metadata (17 kB)
Downloading awswrangler-3.7.3-py3-none-any.whl (378 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m378.2/378.2 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: awswrangler
Successfully installed awswrangler-3.7.3
Note: you may need to restart the kernel to use updated packages.



## Data Preparation

Data Preparation and Exploration are crucial aspects of any Generative AI application. For this Text-to-SQL workshop we will begin with loading of sample data files and creating a data catalog as they lay the foundation for effective querying and extracting insights from data using SQL.

We will Start by installing required Python libraries AWS SDK for Pandas (awswrangler), Pandas and Boto3. AWS SDK for Pandas (awswrangler) is a Python library that simplifies the interaction between Python and the AWS ecosystem, providing a high-level API for working with a wide range of AWS services, including Amazon S3, Athena, Glue, Redshift, and DynamoDB. The library abstracts the complexity of AWS service integrations, streamlines common data engineering tasks, and integrates with other popular data science and machine learning libraries, making it a valuable tool for developers and data professionals working on AWS-based projects. For more information and references, visit [AWS Wrangler GitHub repository](https://github.com/awslabs/aws-data-wrangler)


#### Dataset

We will use a sample ecommerce sales dataset for this sample in the dataset folder


#### Create Database and Tables in Glue Catalog

You can learn more about Glue Catalog here: https://docs.aws.amazon.com/prescriptive-guidance/latest/serverless-etl-aws-glue/aws-glue-data-catalog.html

We will use AWS SDK for Pandas (awswrangler) library to interact with Glue Data Catalog and retrieve a list of all databases.

In [2]:
import awswrangler as wr
import boto3
import pandas as pd

# List all databases in the Glue Data Catalog
databases = wr.catalog.databases()
print(databases)

Empty DataFrame
Columns: [Database, Description]
Index: []



Below sample code reads the customers.csv file, creates a new database and table in the Glue Data Catalog, writes the data to an S3 location as a Parquet dataset, and then retrieves the first 10 rows of the table using an Athena SQL query.

Make sure you replace the "<BUCKET_NAME>" with the name of the bucket in your account.

In [7]:
# Create catalog database: workshop_test
# Create first table (customers) in the database

bucket = ""
path = f"s3://{bucket}/data/"

# Read local csv file customers.csv in to a DataFrame
df = pd.read_csv("dataset/customers.csv")

# Check if the database workshop_test exists
if "workshop_test" not in databases.values:
    wr.catalog.create_database("workshop_test")
    print(wr.catalog.databases())
else:
    print("Database workshop_test already exists")

# List all tables in the database workshop_test
dbs = wr.catalog.tables(database="workshop_test")
# print table count

print("There are {} tables in the database workshop_test".format(len(dbs)))


print("Creating table customers in the database workshop_test")
# Create table customers in the database workshop_test
desc = "Table with list of customers"
param = {"source": "Customer Details Table", "class": "e-commerce"}

comments = {
    "customer_id": "Unique customer ID.",
    "first_name": "Customer first name.",
    "last_name": "Customer last name.",
    "email_id": "Customer email ID.",
    "phone_num": "Customer phone number.",
}

res = wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/customers/",
    dataset=True,
    database="workshop_test",
    table="customers",
    mode="overwrite",
    glue_table_settings=wr.typing.GlueTableSettings(description=desc, parameters=param, columns_comments=comments),
)

print("Table customers created in the database workshop_test")


# Read table customers from the database workshop_test
table = wr.catalog.table(database="workshop_test", table="customers")
print(table)


print("Records in the table customers")

# Run a sample query on the table customers
df = wr.athena.read_sql_query("SELECT * FROM customers LIMIT 10", database="workshop_test")
print(df)

  if "workshop_test" not in databases.values:


        Database Description
0  workshop_test            
There are 0 tables in the database workshop_test
Creating table customers in the database workshop_test
Table customers created in the database workshop_test
   Column Name    Type  Partition                 Comment
0  customer_id  bigint      False     Unique customer ID.
1   first_name  string      False    Customer first name.
2    last_name  string      False     Customer last name.
3     email_id  string      False      Customer email ID.
4    phone_num  string      False  Customer phone number.
Records in the table customers
   customer_id first_name last_name               email_id     phone_num
0            1       John       Doe    johndoe@example.com  123-456-7890
1            2       Jane     Smith  janesmith@example.com  123-456-7891
2            3        Jim      Bean    jimbean@example.com  123-456-7892
3            8     Sophia      Loki    sophial@example.com  123-456-7897
4            9      Bruce    Stumps     

In [8]:
# Create tables for all the csv files in the dataset folder

file_names = ["orders.csv", "products.csv", "orderdetails.csv", "payments.csv", "shipments.csv", "reviews.csv"]

comments_dict = {
    "customers": {
        "customer_id": "Unique customer ID.",
        "first_name": "Customer first name.",
        "last_name": "Customer last name.",
        "email_id": "Customer email ID.",
        "phone_num": "Customer phone number.",
    },
    "orderdetails": {
        "orderdetailid": "Unique order detail ID.",
        "orderid": "Unique order ID.",
        "productid": "Unique product ID.",
        "quantity": "Quantity of product ordered.",
        "price": "Price of product.",
    },
    "orders": {
        "orderid": "Unique order ID.",
        "customerid": "Unique customer ID.",
        "orderdate": "Order date.",
        "totalamount": "Total order amount.",
    },
    "payments": {
        "paymentid": "Unique payment ID.",
        "orderid": "Unique order ID.",
        "paymenttype": "Type of payment.",
        "amount": "Payment amount.",
        "paymentdate": "Payment date.",
        "status": "Payment status.",
    },
    "products": {
        "productid": "Unique product ID.",
        "productname": "Product name.",
        "price": "Product price.",
        "category": "Product category.",
        "stock": "Product stock.",
    },
    "reviews": {
        "reviewid": "Unique review ID.",
        "productid": "Unique product ID.",
        "customerid": "Unique customer ID.",
        "rating": "Product rating.",
        "comment": "Review comment.",
        "reviewdate": "Review date.",
    },
    "shipments": {
        "shipmentid": "Unique shipment ID.",
        "orderid": "Unique order ID.",
        "status": "Shipment status.",
        "estimateddelivery": "Estimated delivery date.",
    },
}

print("Creating tables for all the csv files in the dataset folder")

for file_name in file_names:
    table_name = file_name.split(".")[0]
    df = pd.read_csv(f"dataset/{file_name}")
    res = wr.s3.to_parquet(
        df=df,
        path=f"s3://{bucket}/{table_name}/",
        dataset=True,
        database="workshop_test",
        table=table_name,
        mode="overwrite",
        glue_table_settings=wr.typing.GlueTableSettings(
            description=f"Table with list of {table_name}.",
            parameters={"source": f"{table_name} Table", "class": "e-commerce"},
            columns_comments=comments_dict[table_name],
        ),
    )
    print(f"Table {table_name} created in the database workshop_test")
    

Creating tables for all the csv files in the dataset folder
Table orders created in the database workshop_test
Table products created in the database workshop_test
Table orderdetails created in the database workshop_test
Table payments created in the database workshop_test
Table shipments created in the database workshop_test
Table reviews created in the database workshop_test



### Check created tables and retrieve schemas

In [9]:
import pprint

# New Database in the Glue Data Catalog
database = "workshop_test"

# List all tables in the database workshop_test
all_tables = wr.catalog.tables(database=database)

# Get list of all table names
tables = all_tables["Table"].tolist()

print("List of all tables in the database workshop_test")
print(tables)


# Get schema for all tables
print("Schema for all tables in the database workshop_test")

all_schemas = {}
for table in tables:
    schema_str = wr.catalog.get_table_types(database=database, table=table)
    all_schemas[table] = schema_str

pprint.pprint(all_schemas)



List of all tables in the database workshop_test
['customers', 'orderdetails', 'orders', 'payments', 'products', 'reviews', 'shipments']
Schema for all tables in the database workshop_test
{'customers': {'customer_id': 'bigint',
               'email_id': 'string',
               'first_name': 'string',
               'last_name': 'string',
               'phone_num': 'string'},
 'orderdetails': {'orderdetailid': 'bigint',
                  'orderid': 'bigint',
                  'price': 'double',
                  'productid': 'bigint',
                  'quantity': 'bigint'},
 'orders': {'customerid': 'bigint',
            'orderdate': 'string',
            'orderid': 'bigint',
            'totalamount': 'double'},
 'payments': {'amount': 'double',
              'orderid': 'bigint',
              'paymentdate': 'string',
              'paymentid': 'bigint',
              'paymenttype': 'string',
              'status': 'string'},
 'products': {'category': 'string',
              'pri

#### Validate data load by fetching top 2 records from each table

In [10]:
all_tables = wr.catalog.tables(database=database)
tables = all_tables["Table"].tolist()
print(tables)

for table in tables:
    data = wr.athena.read_sql_query(f"SELECT * FROM {table} LIMIT 10", database="workshop_test")
    # print first 2 rows of the data
    
    print(f"First 2 rows of the table {table}")
    print(data.head(2))
    

['customers', 'orderdetails', 'orders', 'payments', 'products', 'reviews', 'shipments']
First 2 rows of the table customers
   customer_id first_name last_name               email_id     phone_num
0            1       John       Doe    johndoe@example.com  123-456-7890
1            2       Jane     Smith  janesmith@example.com  123-456-7891
First 2 rows of the table orderdetails
   orderdetailid  orderid  productid  quantity   price
0              1        1          1         2   25.99
1              2        2          4         1  699.99
First 2 rows of the table orders
   orderid  customerid   orderdate  totalamount
0        1           1  2024-04-01        85.97
1        2           2  2024-04-01       699.99
First 2 rows of the table payments
   paymentid  orderid  paymenttype  amount paymentdate     status
0          1        1  Credit Card   85.97  2024-04-01  Completed
1          2        2       PayPal  699.99  2024-04-01  Completed
First 2 rows of the table products
   produ



# Generative AI using Bedrock

Amazon Bedrock is a fully managed service that makes FMs from leading AI startups and Amazon available via an API, so you can choose from a wide range of FMs to find the model that is best suited for your use case. With Bedrock's serverless experience, you can get started quickly, privately customize FMs with your own data, and easily integrate and deploy them into your applications using the AWS tools without having to manage any infrastructure.

#### Create Bedrock Client

We will first create a boto3 bedrock client. We can use this client to issue API calls to Generative AI models available in Bedrock.

Note: You can replace the profile_name with the profile name that is configured on your developer environment that has access to Bedrock.

In [11]:

from botocore.config import Config
import json
retry_config = Config(
        retries={
            "max_attempts": 10,
            "mode": "standard",
        },
    )
# region='us-west-2' # If you are using AWS provided account as part of an event use 'us-west-2'.
region='us-east-1'
session = boto3.Session(region_name=region, profile_name='default')
bedrock_client = session.client(
        service_name='bedrock-runtime',
        config=retry_config
    )

print("boto3 Bedrock client successfully created!")

############################
# Note: You can also create a boto3 session with the credentials from environment variables
############################

# # Get the AWS credentials from environment variables
# AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
# AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
# AWS_SESSION_TOKEN = os.environ.get('AWS_SESSION_TOKEN', None)  # Optional, if using temporary credentials

# # Create a boto3 session with the credentials from environment variables
# session = boto3.Session(
#     aws_access_key_id=AWS_ACCESS_KEY_ID,
#     aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
#     aws_session_token=AWS_SESSION_TOKEN,  # Include this line only if using temporary credentials
# )

boto3 Bedrock client successfully created!



#### Creating a function to call Bedrock API

Next, we create a resuable function that uses the client we created above to call Claude 3 Sonnet model on Bedrock. We can pass prompt and temperature to this function.

In [12]:
def call_bedrock_claude_3(prompt_text, temperature):
    model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "messages": [
            {
                "role": "user",
                "content": [
                        {
                            "type": "text",
                            "text": prompt_text
                        }
                ]
            }
        ]
    }
    body = json.dumps(body)
    response = bedrock_client.invoke_model(
            body=body, modelId=model_id
        )
    # Parse the response
    response_lines = response['body'].readlines()
    json_str = response_lines[0].decode('utf-8')
    json_obj = json.loads(json_str)
    result_text = json_obj['content'][0]['text']
    
    return result_text



#### Test a sample bedrock call

In [13]:
# Using Claude 3 Sonnet model
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
prompt = "Hello..."

# Call Bedrock and get the response
response = call_bedrock_claude_3(prompt, 0.7)
print(response)

Hello! How can I assist you today?




# Text to SQL with Generative AI

In this module, we'll showcase how to use generative AI models like Claude 3 to automatically generate SQL queries for analyzing data in Amazon S3 using Athena. We'll provide the AI model with the table schemas from the Glue Catalog and a natural language question describing the desired analysis. The model will then generate the corresponding Athena SQL query, which we can execute to retrieve the results. We'll also build an interactive chat widget that allows you to enter questions in plain English and see the AI-generated SQL queries and their output displayed interactively.


#### Testing SQL generation and execution

We will start by creating a schemas dict which has list of all tables and corresponding columns.


In [14]:
import pprint

database = "workshop_test"

# List all tables in the database workshop_test
all_tables = wr.catalog.tables(database=database)

tables = all_tables["Table"].tolist()

print("List of all tables in the database workshop_test")
print(tables)


# Get schema for all tables
print("Schema for all tables in the database workshop_test")

all_schemas = {}
for table in tables:
    schema_str = wr.catalog.get_table_types(database=database, table=table)
    all_schemas[table] = schema_str

pprint.pprint(all_schemas)






List of all tables in the database workshop_test
['customers', 'orderdetails', 'orders', 'payments', 'products', 'reviews', 'shipments']
Schema for all tables in the database workshop_test
{'customers': {'customer_id': 'bigint',
               'email_id': 'string',
               'first_name': 'string',
               'last_name': 'string',
               'phone_num': 'string'},
 'orderdetails': {'orderdetailid': 'bigint',
                  'orderid': 'bigint',
                  'price': 'double',
                  'productid': 'bigint',
                  'quantity': 'bigint'},
 'orders': {'customerid': 'bigint',
            'orderdate': 'string',
            'orderid': 'bigint',
            'totalamount': 'double'},
 'payments': {'amount': 'double',
              'orderid': 'bigint',
              'paymentdate': 'string',
              'paymentid': 'bigint',
              'paymenttype': 'string',
              'status': 'string'},
 'products': {'category': 'string',
              'pri

In [15]:
import json

with open("all_schemas.json", "w") as outfile: 
    json.dump(all_schemas, outfile)

We can now create a simple prompt with placeholders for tables (schemas) and question. We will then format the prompt by inserting table schemas and question and invoke Claude model on Bedrock to generate SQL query. Finally, we will execute the SQL query using Athena.

Note: We will use call_bedrock_claude_3 function created in the previous cells to invoke the model.

In [16]:
# Simple SQL generation prompt
prompt = """Given the following list of tables, generate syntactically correct SQL query to answer the following question. \n\n
Tables: \n
{tables} \n\n

Question: \n
{question} \n\n

Strict Instructions: \n
- Always end with a semicolon. \n\n
- Only include the SQL query in the response. \n\n
- Always name all columns in the query. \n\n
SQL:
"""


question = "What is the total number of customers?"

# Format the prompt
prompt = prompt.format(tables=all_schemas, question=question)

print(prompt)

# Call Bedrock and get the response
sql = call_bedrock_claude_3(prompt, 0.7)
print(sql)

# Run the generated SQL query
df = wr.athena.read_sql_query(sql, database=database)
df

Given the following list of tables, generate syntactically correct SQL query to answer the following question. 


Tables: 

{'customers': {'customer_id': 'bigint', 'first_name': 'string', 'last_name': 'string', 'email_id': 'string', 'phone_num': 'string'}, 'orderdetails': {'orderdetailid': 'bigint', 'orderid': 'bigint', 'productid': 'bigint', 'quantity': 'bigint', 'price': 'double'}, 'orders': {'orderid': 'bigint', 'customerid': 'bigint', 'orderdate': 'string', 'totalamount': 'double'}, 'payments': {'paymentid': 'bigint', 'orderid': 'bigint', 'paymenttype': 'string', 'amount': 'double', 'paymentdate': 'string', 'status': 'string'}, 'products': {'productid': 'bigint', 'productname': 'string', 'price': 'double', 'category': 'string', 'stock': 'bigint'}, 'reviews': {'reviewid': 'bigint', 'productid': 'bigint', 'customerid': 'bigint', 'rating': 'bigint', 'comment': 'string', 'reviewdate': 'string'}, 'shipments': {'shipmentid': 'bigint', 'orderid': 'bigint', 'status': 'string', 'estimatedde

Unnamed: 0,total_customers
0,10
