# Structured RAG using Amazon Bedrock Knowledge Bases: End-to-End Example using Amazon Redshift

Structured RAG allows Amazon Bedrock Knowledge Bases customers to query structured data in Redshift using natural language, and receive natural language responses summarizing the data thereby providing an answer to the user question.

Using advanced natural language processing, Amazon Bedrock Knowledge Bases can transform natural language queries into SQL queries, allowing users to retrieve data directly from the source without the need to move or preprocess the data. To generate accurate SQL queries, Bedrock Knowledge Base leverages database schema, previous query history, and other contextual information that are provided about the data sources. For more details, please see the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-structured.html).

This notebook provides sample code for building a Structured RAG using Amazon Bedrock Knowledge Bases with Redshift.

## Steps:
1. Create Knowledge Base execution role with necessary policies for accessing data from Amazon Redshift
2. Create a knowledge base with Structured database (Redshift database)
3. Create data source(s) within knowledge base
4. Start ingestion jobs using KB APIs which will read metadata about structured database
5. Once the metadata is extracted and ingested, then user can interact with Structured databases via Amazon Bedrock Knowledge Base APIs using Natural language query

## Prerequisites
This notebook requires:
- A Redshift serverless cluster with a workgroup [OR] Redshift provisioned cluster
- Your workgroup or cluster is already setup with your structured data ingested
- You've set-up the IAM Role [OR] Secrets manager with User Credentials [OR] the DB User

To read more details about prerequisites, see the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-structured.html).


# Structured RAG using Amazon Bedrock Knowledge Bases: End-to-End Example using Amazon Redshift

Structured RAG allows Amazon Bedrock Knowledge Bases customers to query structured data in Redshift using natural language, and receive natural language responses summarizing the data thereby providing an answer to the user question.

Using advanced natural language processing, Amazon Bedrock Knowledge Bases can transform natural language queries into SQL queries, allowing users to retrieve data directly from the source without the need to move or preprocess the data. To generate accurate SQL queries, Bedrock Knowledge Base leverages database schema, previous query history, and other contextual information that are provided about the data sources. For more details, please see the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-structured.html).

This notebook provides sample code for building a Structured RAG using Amazon Bedrock Knowledge Bases with Redshift.

## Steps:
1. Create Knowledge Base execution role with necessary policies for accessing data from Amazon Redshift
2. Create a knowledge base with Structured database (Redshift database)
3. Create data source(s) within knowledge base
4. Start ingestion jobs using KB APIs which will read metadata about structured database
5. Once the metadata is extracted and ingested, then user can interact with Structured databases via Amazon Bedrock Knowledge Base APIs using Natural language query

## Prerequisites
This notebook requires:
- A Redshift serverless cluster with a workgroup [OR] Redshift provisioned cluster
- Your workgroup or cluster is already setup with your structured data ingested
- You've set-up the IAM Role [OR] Secrets manager with User Credentials [OR] the DB User

To read more details about prerequisites, see the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-structured.html).


In [None]:
# %pip install --upgrade pip --quiet
# %pip install -r ../requirements.txt --no-deps --quiet
# %pip install -r ../requirements.txt --upgrade --quiet
# %pip install --upgrade boto3
import boto3
print(boto3.__version__)


1.38.36


In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")


In [None]:
# %load_ext autoreload
# %autoreload 2
# import warnings
# warnings.filterwarnings('ignore')


In [None]:
import sys
import logging
from pathlib import Path

current_path = Path().resolve()
current_path = current_path.parent

if str(current_path) not in sys.path:
    sys.path.append(str(current_path))

# Print sys.path to verify
print(sys.path)

from utils.structured_knowledge_base import BedrockStructuredKnowledgeBase


['/Users/manojs/Documents/Code/samples/05-agentic-rag/2-unstructure-structured-rag_agent', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python312.zip', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/lib-dynload', '', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/site-packages', '/Users/manojs/Documents/Code/samples/05-agentic-rag']


Setup and initialize boto3 clients


In [None]:
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session(region_name='us-west-2')
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id


('us-west-2', '533267284022')

In [None]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"

knowledge_base_name = f"bedrock-sample-structured-kb-{suffix}"
knowledge_base_description = "Sample Structured KB"

foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"


In [None]:
# Configuration for Redshift resources
REDSHIFT_NAMESPACE = f'sds-ecommerce-{suffix}'
REDSHIFT_WORKGROUP = f'sds-ecommerce-wg-{suffix}'
REDSHIFT_DATABASE = f'sds-ecommerce'
S3_BUCKET = f'sds-ecommerce-redshift-{suffix}'

print(f"Redshift Namespace: {REDSHIFT_NAMESPACE}")
print(f"Redshift Workgroup: {REDSHIFT_WORKGROUP}")
print(f"Database: {REDSHIFT_DATABASE}")
print(f"S3 Bucket: {S3_BUCKET}")


Redshift Namespace: sds-ecommerce-0205647
Redshift Workgroup: sds-ecommerce-wg-0205647
Database: sds-ecommerce
S3 Bucket: sds-ecommerce-redshift-0205647


In [None]:
def create_iam_role_for_redshift():
    """Create IAM role for Redshift to access S3"""
    try:
        # Get account ID
        account_id = sts_client.get_caller_identity()['Account']
        
        # Create IAM role if it doesn't exist
        role_name = f'RedshiftS3AccessRole-{suffix}'
        try:
            role_response = iam_client.get_role(RoleName=role_name)
            print(f'Role {role_name} already exists')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
        except iam_client.exceptions.NoSuchEntityException:
            trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "redshift.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
            
            iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy)
            )
            
            # Attach necessary policies
            iam_client.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
            )
            
            print(f'Created role {role_name}')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
            
    except Exception as e:
        print(f'Error creating IAM role: {str(e)}')
        raise

# Initialize additional clients
import json
import os
iam_client = boto3.client('iam')
redshift_client = boto3.client('redshift-serverless', region_name=region)
redshift_data_client = boto3.client('redshift-data', region_name=region)

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")


Created role RedshiftS3AccessRole-0205647
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647
Role RedshiftS3AccessRole-0205647 already exists
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647


In [None]:
def create_redshift_namespace():
    """Create Redshift Serverless namespace"""
    try:
        # Check if namespace already exists
        try:
            response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
            print(f'Namespace {REDSHIFT_NAMESPACE} already exists')
            return response['namespace']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating namespace {REDSHIFT_NAMESPACE}...')
        
        # Create the namespace
        response = redshift_client.create_namespace(
            namespaceName=REDSHIFT_NAMESPACE,
            adminUsername='admin',
            adminUserPassword='TempPassword123!',  # Change this in production
            dbName=REDSHIFT_DATABASE,
            defaultIamRoleArn=redshift_role_arn,
            iamRoles=[redshift_role_arn]
        )
        
        print(f'Created namespace {REDSHIFT_NAMESPACE}')
        
        # Wait for namespace to be available
        print('Waiting for namespace to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                namespace_response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
                status = namespace_response['namespace']['status']
                if status == 'AVAILABLE':
                    print(f'Namespace {REDSHIFT_NAMESPACE} is now available')
                    return namespace_response['namespace']
                else:
                    print(f'Namespace status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking namespace status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for namespace, but proceeding...')
        return response['namespace']
        
    except Exception as e:
        print(f'Error creating namespace: {str(e)}')
        raise

# Create namespace
namespace = create_redshift_namespace()


Creating namespace sds-ecommerce-0205647...
Created namespace sds-ecommerce-0205647
Waiting for namespace to be available...
Namespace sds-ecommerce-0205647 is now available


In [None]:
def create_redshift_workgroup():
    """Create Redshift Serverless workgroup"""
    try:
        # Check if workgroup already exists
        try:
            response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
            print(f'Workgroup {REDSHIFT_WORKGROUP} already exists')
            return response['workgroup']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating workgroup {REDSHIFT_WORKGROUP}...')
        
        # Create the workgroup
        response = redshift_client.create_workgroup(
            workgroupName=REDSHIFT_WORKGROUP,
            namespaceName=REDSHIFT_NAMESPACE,
            baseCapacity=8,  # Minimum base capacity
            enhancedVpcRouting=False,
            publiclyAccessible=True,
            configParameters=[
                {
                    'parameterKey': 'enable_user_activity_logging',
                    'parameterValue': 'true'
                }
            ]
        )
        
        print(f'Created workgroup {REDSHIFT_WORKGROUP}')
        
        # Wait for workgroup to be available
        print('Waiting for workgroup to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                workgroup_response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
                status = workgroup_response['workgroup']['status']
                if status == 'AVAILABLE':
                    print(f'Workgroup {REDSHIFT_WORKGROUP} is now available')
                    return workgroup_response['workgroup']
                else:
                    print(f'Workgroup status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking workgroup status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for workgroup, but proceeding...')
        return response['workgroup']
        
    except Exception as e:
        print(f'Error creating workgroup: {str(e)}')
        raise

# Create workgroup
workgroup = create_redshift_workgroup()
workgroup_arn = workgroup['workgroupArn']
print(f"Workgroup ARN: {workgroup_arn}")


Creating workgroup sds-ecommerce-wg-0205647...
Created workgroup sds-ecommerce-wg-0205647
Waiting for workgroup to be available...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup sds-ecommerce-wg-0205647 is now available
Workgroup ARN: arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa


## 2 - Create S3 Bucket and Load Sample Data

We will create an S3 bucket to stage our sample e-commerce data before loading it into Redshift tables.


In [None]:
def create_s3_bucket():
    """Create S3 bucket for data staging"""
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        print(f'Bucket {S3_BUCKET} already exists')
    except:
        try:
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=S3_BUCKET)
            else:
                s3_client.create_bucket(
                    Bucket=S3_BUCKET,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
            print(f'Created bucket {S3_BUCKET}')
        except Exception as e:
            print(f'Error creating bucket: {str(e)}')
            raise

# Create S3 bucket
create_s3_bucket()


Created bucket sds-ecommerce-redshift-0205647


In [None]:
def upload_sample_data():
    """Upload sample CSV files to S3"""
    data_files = ['orders.csv', 'order_items.csv', 'payments.csv', 'reviews.csv']
    sds_directory = 'sample_data'
    
    print("Uploading sample data files to S3...")
    files_found = 0
    
    for file_name in data_files:
        local_path = os.path.join(sds_directory, file_name)
        if os.path.exists(local_path):
            # Get file size for informational purposes
            file_size = os.path.getsize(local_path)
            file_size_mb = file_size / (1024 * 1024)
            
            s3_client.upload_file(local_path, S3_BUCKET, file_name)
            print(f'Uploaded {file_name} ({file_size_mb:.1f} MB) to S3')
            files_found += 1
        else:
            print(f'Warning: {local_path} not found')
    
    if files_found == len(data_files):
        print(f"Successfully uploaded all {files_found} data files to S3")
    else:
        print(f"Only {files_found} out of {len(data_files)} files were found and uploaded")

# Upload sample data
upload_sample_data()


Uploading sample data files to S3...
Uploaded orders.csv (1.8 MB) to S3
Uploaded order_items.csv (1.3 MB) to S3
Uploaded payments.csv (0.8 MB) to S3
Uploaded reviews.csv (0.5 MB) to S3
Successfully uploaded all 4 data files to S3


In [None]:
# Execute the required SQL commands to grant database access to Bedrock execution role
print("🔧 Executing SQL commands to grant database access to Bedrock execution role...")

bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Step 1: Create the IAM user in Redshift database
create_user_sql = f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;'

try:
    print(f"Creating user: IAMR:{bedrock_role_name}")
    run_redshift_statement(create_user_sql)
    print("✅ IAM user created successfully!")
except Exception as e:
    if "already exists" in str(e).lower():
        print("ℹ️ User already exists, continuing...")
    else:
        print(f"❌ Error creating user: {str(e)}")
        raise

# Step 2: Grant USAGE permission on public schema
grant_usage_sql = f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting USAGE on schema public to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_usage_sql)
    print("✅ USAGE permission granted successfully!")
except Exception as e:
    print(f"❌ Error granting USAGE permission: {str(e)}")
    raise

# Step 3: Grant SELECT permission on all tables in public schema
grant_select_sql = f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting SELECT on all tables to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_select_sql)
    print("✅ SELECT permissions granted successfully!")
except Exception as e:
    print(f"❌ Error granting SELECT permissions: {str(e)}")
    raise

print("\\n🎉 All database permissions have been granted to the Bedrock execution role!")
print("The Knowledge Base should now be able to access the database tables.")


🔧 Executing SQL commands to grant database access to Bedrock execution role...
Creating user: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: cb83ac9a-aa81-41ad-8bde-91ceb92cfc0c
Statement status: SUBMITTED, waiting...
Statement status: STARTED, waiting...
Statement status: STARTED, waiting...
Error executing statement: Statement failed: ERROR: user "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" already exists
ℹ️ User already exists, continuing...
Granting USAGE on schema public to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: c948acbc-0201-440d-b2e1-fee3521a65ac
Statement status: PICKED, waiting...
Statement completed successfully
✅ USAGE permission granted successfully!
Granting SELECT on all tables to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: 6c413ca2-d25d-41bb-ae99-d28f3506ee19
Statement status: PICKED, waiting...
Statement completed su

In [None]:
# Retry the ingestion job now that permissions are fixed
print("🔄 Retrying the ingestion job with proper permissions...")

try:
    # Start a new ingestion job
    response = knowledge_base.start_ingestion_job()
    print("✅ New ingestion job started successfully!")
    print(f"Ingestion Job Response: {response}")
    
    # Wait a bit for the job to process
    import time
    print("⏳ Waiting for ingestion job to complete...")
    time.sleep(30)
    
    # Check the ingestion job status
    job_id = response['ingestionJobId']
    job_status = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=response['dataSourceId'],
        ingestionJobId=job_id
    )
    
    print(f"\\n📊 Ingestion Job Status: {job_status['ingestionJob']['status']}")
    if job_status['ingestionJob']['status'] == 'COMPLETE':
        print("🎉 Ingestion job completed successfully!")
    elif job_status['ingestionJob']['status'] == 'IN_PROGRESS':
        print("⏳ Ingestion job is still in progress. Wait a few more minutes.")
    else:
        print(f"❌ Ingestion job status: {job_status['ingestionJob']['status']}")
        if 'failureReasons' in job_status['ingestionJob']:
            print(f"Failure reasons: {job_status['ingestionJob']['failureReasons']}")
            
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🔄 Retrying the ingestion job with proper permissions...
job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'YBQXYQU4WL',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 7, 12, 41, 597571, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 7, 12, 44, 860633, tzinfo=tzutc())}
✅ New ingestion job started successfully!
Ingestion Job Response: None
⏳ Waiting for ingestion job to complete...
❌ Error starting ingestion job: 'NoneType' object is not subscriptable


TypeError: 'NoneType' object is not subscriptable

In [None]:
# Helper function to check ingestion job status
def check_ingestion_status(kb_id, data_source_id=None, job_id=None):
    """Check the status of ingestion jobs for a knowledge base"""
    try:
        # If no data_source_id provided, get the first one
        if not data_source_id:
            data_sources = bedrock_agent_client.list_data_sources(
                knowledgeBaseId=kb_id,
                maxResults=10
            )
            if data_sources['dataSourceSummaries']:
                data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
            else:
                print("❌ No data sources found for this knowledge base")
                return
        
        # List ingestion jobs
        ingestion_jobs = bedrock_agent_client.list_ingestion_jobs(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id,
            maxResults=5
        )
        
        print(f"📋 Recent ingestion jobs for Knowledge Base {kb_id}:")
        print("=" * 80)
        
        for i, job in enumerate(ingestion_jobs['ingestionJobSummaries'][:3]):
            status_emoji = {
                'COMPLETE': '✅',
                'IN_PROGRESS': '⏳', 
                'FAILED': '❌',
                'STARTING': '🔄'
            }.get(job['status'], '❓')
            
            print(f"{i+1}. Job ID: {job['ingestionJobId']}")
            print(f"   Status: {status_emoji} {job['status']}")
            print(f"   Started: {job['startedAt']}")
            print(f"   Updated: {job['updatedAt']}")
            print()
            
    except Exception as e:
        print(f"❌ Error checking ingestion status: {str(e)}")

# Check current ingestion status
print("🔍 Checking current ingestion job status...")
check_ingestion_status(kb_id)


In [None]:
# Fix the IAM service role permissions and retry ingestion
print("🔧 Checking and fixing IAM service role permissions...")

# Get the current service role
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name
role_arn = f"arn:aws:iam::{account_id}:role/{bedrock_role_name}"

print(f"Service Role: {bedrock_role_name}")
print(f"Service Role ARN: {role_arn}")

# Create a comprehensive policy that matches AWS documentation requirements
enhanced_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "RedshiftDataAPIStatementPermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:GetStatementResult",
                "redshift-data:DescribeStatement", 
                "redshift-data:CancelStatement"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "redshift-data:statement-owner-iam-userid": "${aws:userid}"
                }
            }
        },
        {
            "Sid": "RedshiftDataAPIExecutePermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:ExecuteStatement"
            ],
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "RedshiftServerlessGetCredentials",
            "Effect": "Allow",
            "Action": "redshift-serverless:GetCredentials",
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "SqlWorkbenchAccess",
            "Effect": "Allow",
            "Action": [
                "sqlworkbench:GetSqlRecommendations",
                "sqlworkbench:PutSqlGenerationContext",
                "sqlworkbench:GetSqlGenerationContext", 
                "sqlworkbench:DeleteSqlGenerationContext"
            ],
            "Resource": "*"
        },
        {
            "Sid": "BedrockAccess",
            "Effect": "Allow",
            "Action": [
                "bedrock:GenerateQuery",
                "bedrock:InvokeModel",
                "bedrock:Retrieve",
                "bedrock:RetrieveAndGenerate"
            ],
            "Resource": "*"
        },
        {
            "Sid": "LoggingPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

# Create and attach the enhanced policy
enhanced_policy_name = f'EnhancedBedrockRedshiftPolicy-{suffix}'

try:
    # Create the enhanced policy
    enhanced_policy_response = iam_client.create_policy(
        PolicyName=enhanced_policy_name,
        PolicyDocument=json.dumps(enhanced_policy_document),
        Description='Enhanced policy for Bedrock Knowledge Base with Redshift access'
    )
    enhanced_policy_arn = enhanced_policy_response['Policy']['Arn']
    print(f"✅ Created enhanced policy: {enhanced_policy_name}")
    
    # Attach the enhanced policy to the role
    iam_client.attach_role_policy(
        RoleName=bedrock_role_name,
        PolicyArn=enhanced_policy_arn
    )
    print(f"✅ Attached enhanced policy to role: {bedrock_role_name}")
    
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"ℹ️ Enhanced policy {enhanced_policy_name} already exists")
    enhanced_policy_arn = f"arn:aws:iam::{account_id}:policy/{enhanced_policy_name}"
    try:
        iam_client.attach_role_policy(
            RoleName=bedrock_role_name,
            PolicyArn=enhanced_policy_arn
        )
        print(f"✅ Attached existing enhanced policy to role")
    except iam_client.exceptions.EntityAlreadyExistsException:
        print(f"ℹ️ Enhanced policy already attached to role")
except Exception as e:
    print(f"❌ Error creating/attaching enhanced policy: {str(e)}")

print("\\n🔄 Waiting 30 seconds for IAM changes to propagate...")
import time
time.sleep(30)


🔧 Checking and fixing IAM service role permissions...
Service Role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Service Role ARN: arn:aws:iam::533267284022:role/AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
✅ Created enhanced policy: EnhancedBedrockRedshiftPolicy-0205647
✅ Attached enhanced policy to role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
\n🔄 Waiting 30 seconds for IAM changes to propagate...


In [None]:
# Start ingestion job directly using Bedrock Agent client
print("🚀 Starting ingestion job with enhanced permissions...")

try:
    # Get the data source ID
    data_sources = bedrock_agent_client.list_data_sources(
        knowledgeBaseId=kb_id,
        maxResults=10
    )
    
    if not data_sources['dataSourceSummaries']:
        print("❌ No data sources found for this knowledge base")
    else:
        data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
        print(f"📋 Using data source ID: {data_source_id}")
        
        # Start the ingestion job directly
        start_job_response = bedrock_agent_client.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id
        )
        
        job = start_job_response["ingestionJob"]
        print(f"✅ Ingestion job started successfully!")
        print(f"📊 Job ID: {job['ingestionJobId']}")
        print(f"📊 Initial Status: {job['status']}")
        
        # Monitor the job status
        job_id = job['ingestionJobId']
        max_attempts = 20
        wait_time = 15
        
        print(f"\\n⏳ Monitoring ingestion job progress...")
        
        for attempt in range(max_attempts):
            try:
                get_job_response = bedrock_agent_client.get_ingestion_job(
                    knowledgeBaseId=kb_id,
                    dataSourceId=data_source_id,
                    ingestionJobId=job_id
                )
                
                current_job = get_job_response["ingestionJob"]
                status = current_job['status']
                
                status_emoji = {
                    'COMPLETE': '✅',
                    'IN_PROGRESS': '⏳',
                    'STARTING': '🔄', 
                    'FAILED': '❌',
                    'STOPPED': '⏹️'
                }.get(status, '❓')
                
                print(f"Attempt {attempt + 1}/{max_attempts}: {status_emoji} Status: {status}")
                
                if status == 'COMPLETE':
                    print("\\n🎉 Ingestion job completed successfully!")
                    print("🎯 Knowledge Base is now ready to answer queries!")
                    break
                elif status == 'FAILED':
                    print("\\n❌ Ingestion job failed!")
                    if 'failureReasons' in current_job:
                        print(f"Failure reasons: {current_job['failureReasons']}")
                    if 'statistics' in current_job:
                        print(f"Statistics: {current_job['statistics']}")
                    break
                elif status == 'STOPPED':
                    print("\\n⏹️ Ingestion job was stopped!")
                    break
                elif status in ['IN_PROGRESS', 'STARTING']:
                    if attempt < max_attempts - 1:
                        print(f"   ⏳ Waiting {wait_time} seconds before next check...")
                        time.sleep(wait_time)
                    else:
                        print("\\n⏰ Timeout reached. Job may still be running in background.")
                        print("Use the check_ingestion_status function to monitor progress.")
                
            except Exception as e:
                print(f"❌ Error checking job status: {str(e)}")
                if attempt < max_attempts - 1:
                    print(f"   🔄 Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print("\\n❌ Maximum retries reached for status check")
                    break
                
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🚀 Starting ingestion job with enhanced permissions...
📋 Using data source ID: EOZFBUZD6F
✅ Ingestion job started successfully!
📊 Job ID: JKTRBFLDGT
📊 Initial Status: STARTING
\n⏳ Monitoring ingestion job progress...
Attempt 1/20: ⏳ Status: IN_PROGRESS
   ⏳ Waiting 15 seconds before next check...
Attempt 2/20: ❌ Status: FAILED
\n❌ Ingestion job failed!


In [None]:
# Test the Knowledge Base once ingestion is successful
def test_knowledge_base():
    """Test the Knowledge Base with sample queries"""
    print("🧪 Testing Knowledge Base functionality...")
    
    # Test queries
    test_queries = [
        "How many orders are there in total?",
        "What is the average order value?", 
        "Which payment method is most popular?",
        "How many different products have been ordered?"
    ]
    
    foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
    
    for i, query in enumerate(test_queries, 1):
        print(f"\\n🔍 Test Query {i}: {query}")
        print("-" * 50)
        
        try:
            # Test with retrieve_and_generate
            response = bedrock_agent_runtime_client.retrieve_and_generate(
                input={"text": query},
                retrieveAndGenerateConfiguration={
                    "type": "KNOWLEDGE_BASE",
                    "knowledgeBaseConfiguration": {
                        'knowledgeBaseId': kb_id,
                        "modelArn": f"arn:aws:bedrock:{region}::foundation-model/{foundation_model}",
                        "retrievalConfiguration": {
                            "vectorSearchConfiguration": {
                                "numberOfResults": 5
                            } 
                        }
                    }
                }
            )
            
            print(f"✅ Response: {response['output']['text']}")
            
        except Exception as e:
            print(f"❌ Error: {str(e)}")
            
        print()

# Note: Only run this after ingestion completes successfully
print("📝 Note: Use test_knowledge_base() function after ingestion completes")
print("Example: test_knowledge_base()")


📝 Note: Use test_knowledge_base() function after ingestion completes
Example: test_knowledge_base()


In [None]:
def wait_for_statement(statement_id):
    """Wait for a Redshift Data API statement to complete"""
    max_attempts = 30
    for attempt in range(max_attempts):
        try:
            response = redshift_data_client.describe_statement(Id=statement_id)
            status = response['Status']
            if status == 'FINISHED':
                return response
            elif status == 'FAILED':
                raise Exception(f"Statement failed: {response.get('Error', 'Unknown error')}")
            elif status == 'CANCELLED':
                raise Exception("Statement was cancelled")
            else:
                print(f"Statement status: {status}, waiting...")
                time.sleep(5)
        except Exception as e:
            if "Statement failed" in str(e) or "cancelled" in str(e):
                raise
            print(f"Error checking statement status: {str(e)}, retrying...")
            time.sleep(5)
    
    raise Exception("Timeout waiting for statement to complete")

def run_redshift_statement(sql_statement):
    """Execute a SQL statement in Redshift"""
    try:
        response = redshift_data_client.execute_statement(
            WorkgroupName=REDSHIFT_WORKGROUP,
            Database=REDSHIFT_DATABASE,
            Sql=sql_statement
        )
        statement_id = response['Id']
        print(f"Executing statement: {statement_id}")
        result = wait_for_statement(statement_id)
        print(f"Statement completed successfully")
        return result
    except Exception as e:
        print(f"Error executing statement: {str(e)}")
        raise


In [None]:
# Create tables in Redshift
def create_tables():
    """Create all necessary tables in Redshift"""
    
    # Orders table
    orders_sql = """
    CREATE TABLE IF NOT EXISTS orders (
        order_id VARCHAR(255) PRIMARY KEY,
        customer_id VARCHAR(255),
        order_total DECIMAL(10,2),
        order_status VARCHAR(50),
        payment_method VARCHAR(50),
        shipping_address TEXT,
        created_at TIMESTAMP,
        updated_at TIMESTAMP
    );
    """
    
    # Order Items table
    order_items_sql = """
    CREATE TABLE IF NOT EXISTS order_items (
        order_item_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        product_id VARCHAR(255),
        quantity INTEGER,
        price DECIMAL(10,2)
    );
    """
    
    # Payments table
    payments_sql = """
    CREATE TABLE IF NOT EXISTS payments (
        payment_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        customer_id VARCHAR(255),
        amount DECIMAL(10,2),
        payment_method VARCHAR(50),
        payment_status VARCHAR(50),
        created_at DATE
    );
    """
    
    # Reviews table
    reviews_sql = """
    CREATE TABLE IF NOT EXISTS reviews (
        review_id VARCHAR(255) PRIMARY KEY,
        product_id VARCHAR(255),
        customer_id VARCHAR(255),
        rating INTEGER,
        created_at DATE
    );
    """
    
    tables = {
        'orders': orders_sql,
        'order_items': order_items_sql,
        'payments': payments_sql,
        'reviews': reviews_sql
    }
    
    for table_name, sql in tables.items():
        print(f"Creating table: {table_name}")
        run_redshift_statement(sql)
        print(f"Created table: {table_name}")
        print("-------------")

# Create tables
create_tables()


Creating table: orders
Executing statement: 4d669868-ea32-4ce0-9180-c977ebc99514
Statement status: PICKED, waiting...
Statement completed successfully
Created table: orders
-------------
Creating table: order_items
Executing statement: ea378983-2fac-43f2-b3ce-9c8d016ef05d
Statement status: PICKED, waiting...
Statement completed successfully
Created table: order_items
-------------
Creating table: payments
Executing statement: a75c6c7a-9c38-4b72-8c74-82563692520b
Statement status: SUBMITTED, waiting...
Statement completed successfully
Created table: payments
-------------
Creating table: reviews
Executing statement: e227ecbd-4eb6-4a0b-a9d9-f41049822560
Statement status: PICKED, waiting...
Statement completed successfully
Created table: reviews
-------------


In [None]:
# Load data from S3 into Redshift tables
def load_data_from_s3():
    """Load data from S3 CSV files into Redshift tables"""
    
    tables_and_files = {
        'orders': 'orders.csv',
        'order_items': 'order_items.csv',
        'payments': 'payments.csv',
        'reviews': 'reviews.csv'
    }
    
    for table_name, file_name in tables_and_files.items():
        print(f"Loading data into {table_name} from {file_name}")
        
        copy_sql = f"""
        COPY {table_name}
        FROM 's3://{S3_BUCKET}/{file_name}'
        IAM_ROLE '{redshift_role_arn}'
        CSV
        IGNOREHEADER 1
        DELIMITER ','
        REGION '{region}';
        """
        
        try:
            run_redshift_statement(copy_sql)
            print(f"Loaded data into {table_name}")
        except Exception as e:
            print(f"Error loading data into {table_name}: {str(e)}")

# Load data from S3
load_data_from_s3()


Loading data into orders from orders.csv
Executing statement: 0a23526b-aa4a-4d34-9458-a6d0f8c0433a
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into orders
Loading data into order_items from order_items.csv
Executing statement: 84d54756-be6c-43c2-9bb8-4f47f5657c37
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into order_items
Loading data into payments from payments.csv
Executing statement: 8e48260c-1a43-45c8-a626-4ca55af5866c
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into payments
Loading data into reviews from reviews.csv
Executing statement: 6ba97e21-83cd-4e22-843c-068c0bfeb201
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into reviews


In [None]:
# Knowledge Base Configuration for IAM Role + Redshift Serverless WorkGroup
kbConfigParam = {
    "type": "SQL",
    "sqlKnowledgeBaseConfiguration": {
        "type": "REDSHIFT",
        "redshiftConfiguration": {
            "storageConfigurations": [{
                "type": "REDSHIFT",
                "redshiftConfiguration": {
                    "databaseName": REDSHIFT_DATABASE
                }
            }],
            "queryEngineConfiguration": {
                "type": "SERVERLESS",
                "serverlessConfiguration": {
                    "workgroupArn": workgroup_arn,
                    "authConfiguration": {
                        "type": "IAM"
                    }
                }
            }
        }
    }
}


In [None]:
# Create the Knowledge Base using IAM Role + Redshift Serverless WorkGroup
print("Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...")

knowledge_base = BedrockStructuredKnowledgeBase(
    kb_name=knowledge_base_name,
    kb_description=knowledge_base_description,
    workgroup_arn=workgroup_arn,
    kbConfigParam=kbConfigParam,
    generation_model=foundation_model,
    suffix=suffix
)

print("Knowledge Base created successfully!")
print(f"Knowledge Base ID: {knowledge_base.get_knowledge_base_id()}")


[2025-06-20 21:04:12,179] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...


[2025-06-20 21:04:12,950] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Step 1 - Creating Knowledge Base Execution Role (AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647) and Policies
Step 2 - Creating Knowledge Base
{ 'createdAt': datetime.datetime(2025, 6, 21, 4, 4, 13, 800686, tzinfo=tzutc()),
  'description': 'Sample Structured KB',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:533267284022:knowledge-base/WCNCTKFCKY',
  'knowledgeBaseConfiguration': { 'sqlKnowledgeBaseConfiguration': { 'redshiftConfiguration': { 'queryEngineConfiguration': { 'serverlessConfiguration': { 'authConfiguration': { 'type': 'IAM'},
                                                                                                                                                           'workgroupArn': 'arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa'},
                                                                                                                              'type': 'SERVERLESS'},
               

In [None]:
# Get the Bedrock execution role name
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Display the SQL commands that need to be executed in Redshift Query Editor
print("Execute the following SQL commands in your Redshift Query Editor:")
print("=" * 70)
print(f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;')
print(f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";')
print(f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";')
print("=" * 70)
print("\\nThese commands will:")
print("1. Create an IAM-based database user for the Bedrock execution role")
print("2. Grant USAGE permission on the public schema")
print("3. Grant SELECT permission on all tables in the public schema")
print("\\nNote: Adjust the schema and table permissions based on your specific requirements.")


Execute the following SQL commands in your Redshift Query Editor:
CREATE USER "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" WITH PASSWORD DISABLE;
GRANT USAGE ON SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
\nThese commands will:
1. Create an IAM-based database user for the Bedrock execution role
2. Grant USAGE permission on the public schema
3. Grant SELECT permission on all tables in the public schema
\nNote: Adjust the schema and table permissions based on your specific requirements.


In [None]:
# ensure that the kb is available
time.sleep(60)
# sync knowledge base
knowledge_base.start_ingestion_job()
# keep the kb_id for invocation later in the invoke request
kb_id = knowledge_base.get_knowledge_base_id()
# %store kb_id


job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'HOYDC9A0DV',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 4, 5, 39, 955341, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 4, 5, 43, 167059, tzinfo=tzutc())}
'WCNCTKFCKY'


In [None]:
query = "How many orders are there in total?"


In [None]:
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults": 5
                } 
            }
        }
    }
)

print(response['output']['text'], end='\n'*2)


ValidationException: An error occurred (ValidationException) when calling the RetrieveAndGenerate operation: No metadata found. Please trigger an ingestion and try again. (Service: BedrockAgentRuntime, Status Code: 400, Request ID: 4846aded-b9bf-403d-ad46-00cc9556ccc6) (SDK Attempt Count: 1)

In [None]:
response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults": 5,
        } 
    },
    retrievalQuery={
        "text": query
    }
)


In [None]:
import json
import pandas as pd

# Function to extract retrieved results from Retrieve API response into a pandas dataframe.
def response_print(retrieve_resp):
    # Extract the retrievalResults list
    retrieval_results = retrieve_resp['retrievalResults']

    # Dictionary to store the extracted data
    extracted_data = {}

    # Iterate through each item in retrievalResults
    for item in retrieval_results:
        row = item['content']['row']
        for col in row:
            column_name = col['columnName']
            column_value = col['columnValue']
            
            # If this column hasn't been seen before, create a new list for it
            if column_name not in extracted_data:
                extracted_data[column_name] = []
            
            # Append the value to the appropriate list
            extracted_data[column_name].append(column_value)

    # Create a DataFrame from the extracted data
    df = pd.DataFrame(extracted_data)
    return df
    
# Display the Retrieved results records
df = response_print(response_ret)
print(df.head())


In [None]:
query_response = bedrock_agent_runtime_client.generate_query(
    queryGenerationInput={
        "text": query,
        "type": "TEXT"
    },
    transformationConfiguration={
        "mode": "TEXT_TO_SQL",
        "textToSqlConfiguration": {
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseArn": knowledge_base.knowledge_base['knowledgeBaseArn']
            }
        }
    }
)

generated_sql = query_response['queries'][0]['sql']
generated_sql


In [None]:
# Delete resources
# print("=============================== Deleting resources ==============================\n")
# knowledge_base.delete_kb(delete_iam_roles_and_policies=True)


In [None]:
# %pip install --upgrade pip --quiet
# %pip install -r ../requirements.txt --no-deps --quiet
# %pip install -r ../requirements.txt --upgrade --quiet
# %pip install --upgrade boto3
import boto3
print(boto3.__version__)


1.38.36


In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")


In [None]:
# %load_ext autoreload
# %autoreload 2
# import warnings
# warnings.filterwarnings('ignore')


In [None]:
import sys
import logging
from pathlib import Path

current_path = Path().resolve()
current_path = current_path.parent

if str(current_path) not in sys.path:
    sys.path.append(str(current_path))

# Print sys.path to verify
print(sys.path)

from utils.structured_knowledge_base import BedrockStructuredKnowledgeBase


['/Users/manojs/Documents/Code/samples/05-agentic-rag/2-unstructure-structured-rag_agent', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python312.zip', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/lib-dynload', '', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/site-packages', '/Users/manojs/Documents/Code/samples/05-agentic-rag']


Setup and initialize boto3 clients


In [None]:
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session(region_name='us-west-2')
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id


('us-west-2', '533267284022')

In [None]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"

knowledge_base_name = f"bedrock-sample-structured-kb-{suffix}"
knowledge_base_description = "Sample Structured KB"

foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"


In [None]:
# Configuration for Redshift resources
REDSHIFT_NAMESPACE = f'sds-ecommerce-{suffix}'
REDSHIFT_WORKGROUP = f'sds-ecommerce-wg-{suffix}'
REDSHIFT_DATABASE = f'sds-ecommerce'
S3_BUCKET = f'sds-ecommerce-redshift-{suffix}'

print(f"Redshift Namespace: {REDSHIFT_NAMESPACE}")
print(f"Redshift Workgroup: {REDSHIFT_WORKGROUP}")
print(f"Database: {REDSHIFT_DATABASE}")
print(f"S3 Bucket: {S3_BUCKET}")


Redshift Namespace: sds-ecommerce-0205647
Redshift Workgroup: sds-ecommerce-wg-0205647
Database: sds-ecommerce
S3 Bucket: sds-ecommerce-redshift-0205647


In [None]:
def create_iam_role_for_redshift():
    """Create IAM role for Redshift to access S3"""
    try:
        # Get account ID
        account_id = sts_client.get_caller_identity()['Account']
        
        # Create IAM role if it doesn't exist
        role_name = f'RedshiftS3AccessRole-{suffix}'
        try:
            role_response = iam_client.get_role(RoleName=role_name)
            print(f'Role {role_name} already exists')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
        except iam_client.exceptions.NoSuchEntityException:
            trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "redshift.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
            
            iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy)
            )
            
            # Attach necessary policies
            iam_client.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
            )
            
            print(f'Created role {role_name}')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
            
    except Exception as e:
        print(f'Error creating IAM role: {str(e)}')
        raise

# Initialize additional clients
import json
import os
iam_client = boto3.client('iam')
redshift_client = boto3.client('redshift-serverless', region_name=region)
redshift_data_client = boto3.client('redshift-data', region_name=region)

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")


Created role RedshiftS3AccessRole-0205647
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647
Role RedshiftS3AccessRole-0205647 already exists
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647


In [None]:
def create_redshift_namespace():
    """Create Redshift Serverless namespace"""
    try:
        # Check if namespace already exists
        try:
            response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
            print(f'Namespace {REDSHIFT_NAMESPACE} already exists')
            return response['namespace']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating namespace {REDSHIFT_NAMESPACE}...')
        
        # Create the namespace
        response = redshift_client.create_namespace(
            namespaceName=REDSHIFT_NAMESPACE,
            adminUsername='admin',
            adminUserPassword='TempPassword123!',  # Change this in production
            dbName=REDSHIFT_DATABASE,
            defaultIamRoleArn=redshift_role_arn,
            iamRoles=[redshift_role_arn]
        )
        
        print(f'Created namespace {REDSHIFT_NAMESPACE}')
        
        # Wait for namespace to be available
        print('Waiting for namespace to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                namespace_response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
                status = namespace_response['namespace']['status']
                if status == 'AVAILABLE':
                    print(f'Namespace {REDSHIFT_NAMESPACE} is now available')
                    return namespace_response['namespace']
                else:
                    print(f'Namespace status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking namespace status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for namespace, but proceeding...')
        return response['namespace']
        
    except Exception as e:
        print(f'Error creating namespace: {str(e)}')
        raise

# Create namespace
namespace = create_redshift_namespace()


Creating namespace sds-ecommerce-0205647...
Created namespace sds-ecommerce-0205647
Waiting for namespace to be available...
Namespace sds-ecommerce-0205647 is now available


In [None]:
def create_redshift_workgroup():
    """Create Redshift Serverless workgroup"""
    try:
        # Check if workgroup already exists
        try:
            response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
            print(f'Workgroup {REDSHIFT_WORKGROUP} already exists')
            return response['workgroup']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating workgroup {REDSHIFT_WORKGROUP}...')
        
        # Create the workgroup
        response = redshift_client.create_workgroup(
            workgroupName=REDSHIFT_WORKGROUP,
            namespaceName=REDSHIFT_NAMESPACE,
            baseCapacity=8,  # Minimum base capacity
            enhancedVpcRouting=False,
            publiclyAccessible=True,
            configParameters=[
                {
                    'parameterKey': 'enable_user_activity_logging',
                    'parameterValue': 'true'
                }
            ]
        )
        
        print(f'Created workgroup {REDSHIFT_WORKGROUP}')
        
        # Wait for workgroup to be available
        print('Waiting for workgroup to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                workgroup_response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
                status = workgroup_response['workgroup']['status']
                if status == 'AVAILABLE':
                    print(f'Workgroup {REDSHIFT_WORKGROUP} is now available')
                    return workgroup_response['workgroup']
                else:
                    print(f'Workgroup status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking workgroup status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for workgroup, but proceeding...')
        return response['workgroup']
        
    except Exception as e:
        print(f'Error creating workgroup: {str(e)}')
        raise

# Create workgroup
workgroup = create_redshift_workgroup()
workgroup_arn = workgroup['workgroupArn']
print(f"Workgroup ARN: {workgroup_arn}")


Creating workgroup sds-ecommerce-wg-0205647...
Created workgroup sds-ecommerce-wg-0205647
Waiting for workgroup to be available...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup sds-ecommerce-wg-0205647 is now available
Workgroup ARN: arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa


## 2 - Create S3 Bucket and Load Sample Data

We will create an S3 bucket to stage our sample e-commerce data before loading it into Redshift tables.


In [None]:
def create_s3_bucket():
    """Create S3 bucket for data staging"""
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        print(f'Bucket {S3_BUCKET} already exists')
    except:
        try:
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=S3_BUCKET)
            else:
                s3_client.create_bucket(
                    Bucket=S3_BUCKET,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
            print(f'Created bucket {S3_BUCKET}')
        except Exception as e:
            print(f'Error creating bucket: {str(e)}')
            raise

# Create S3 bucket
create_s3_bucket()


Created bucket sds-ecommerce-redshift-0205647


In [None]:
def upload_sample_data():
    """Upload sample CSV files to S3"""
    data_files = ['orders.csv', 'order_items.csv', 'payments.csv', 'reviews.csv']
    sds_directory = 'sample_data'
    
    print("Uploading sample data files to S3...")
    files_found = 0
    
    for file_name in data_files:
        local_path = os.path.join(sds_directory, file_name)
        if os.path.exists(local_path):
            # Get file size for informational purposes
            file_size = os.path.getsize(local_path)
            file_size_mb = file_size / (1024 * 1024)
            
            s3_client.upload_file(local_path, S3_BUCKET, file_name)
            print(f'Uploaded {file_name} ({file_size_mb:.1f} MB) to S3')
            files_found += 1
        else:
            print(f'Warning: {local_path} not found')
    
    if files_found == len(data_files):
        print(f"Successfully uploaded all {files_found} data files to S3")
    else:
        print(f"Only {files_found} out of {len(data_files)} files were found and uploaded")

# Upload sample data
upload_sample_data()


Uploading sample data files to S3...
Uploaded orders.csv (1.8 MB) to S3
Uploaded order_items.csv (1.3 MB) to S3
Uploaded payments.csv (0.8 MB) to S3
Uploaded reviews.csv (0.5 MB) to S3
Successfully uploaded all 4 data files to S3


In [None]:
# Execute the required SQL commands to grant database access to Bedrock execution role
print("🔧 Executing SQL commands to grant database access to Bedrock execution role...")

bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Step 1: Create the IAM user in Redshift database
create_user_sql = f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;'

try:
    print(f"Creating user: IAMR:{bedrock_role_name}")
    run_redshift_statement(create_user_sql)
    print("✅ IAM user created successfully!")
except Exception as e:
    if "already exists" in str(e).lower():
        print("ℹ️ User already exists, continuing...")
    else:
        print(f"❌ Error creating user: {str(e)}")
        raise

# Step 2: Grant USAGE permission on public schema
grant_usage_sql = f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting USAGE on schema public to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_usage_sql)
    print("✅ USAGE permission granted successfully!")
except Exception as e:
    print(f"❌ Error granting USAGE permission: {str(e)}")
    raise

# Step 3: Grant SELECT permission on all tables in public schema
grant_select_sql = f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting SELECT on all tables to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_select_sql)
    print("✅ SELECT permissions granted successfully!")
except Exception as e:
    print(f"❌ Error granting SELECT permissions: {str(e)}")
    raise

print("\\n🎉 All database permissions have been granted to the Bedrock execution role!")
print("The Knowledge Base should now be able to access the database tables.")


🔧 Executing SQL commands to grant database access to Bedrock execution role...
Creating user: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: cb83ac9a-aa81-41ad-8bde-91ceb92cfc0c
Statement status: SUBMITTED, waiting...
Statement status: STARTED, waiting...
Statement status: STARTED, waiting...
Error executing statement: Statement failed: ERROR: user "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" already exists
ℹ️ User already exists, continuing...
Granting USAGE on schema public to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: c948acbc-0201-440d-b2e1-fee3521a65ac
Statement status: PICKED, waiting...
Statement completed successfully
✅ USAGE permission granted successfully!
Granting SELECT on all tables to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: 6c413ca2-d25d-41bb-ae99-d28f3506ee19
Statement status: PICKED, waiting...
Statement completed su

In [None]:
# Retry the ingestion job now that permissions are fixed
print("🔄 Retrying the ingestion job with proper permissions...")

try:
    # Start a new ingestion job
    response = knowledge_base.start_ingestion_job()
    print("✅ New ingestion job started successfully!")
    print(f"Ingestion Job Response: {response}")
    
    # Wait a bit for the job to process
    import time
    print("⏳ Waiting for ingestion job to complete...")
    time.sleep(30)
    
    # Check the ingestion job status
    job_id = response['ingestionJobId']
    job_status = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=response['dataSourceId'],
        ingestionJobId=job_id
    )
    
    print(f"\\n📊 Ingestion Job Status: {job_status['ingestionJob']['status']}")
    if job_status['ingestionJob']['status'] == 'COMPLETE':
        print("🎉 Ingestion job completed successfully!")
    elif job_status['ingestionJob']['status'] == 'IN_PROGRESS':
        print("⏳ Ingestion job is still in progress. Wait a few more minutes.")
    else:
        print(f"❌ Ingestion job status: {job_status['ingestionJob']['status']}")
        if 'failureReasons' in job_status['ingestionJob']:
            print(f"Failure reasons: {job_status['ingestionJob']['failureReasons']}")
            
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🔄 Retrying the ingestion job with proper permissions...
job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'YBQXYQU4WL',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 7, 12, 41, 597571, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 7, 12, 44, 860633, tzinfo=tzutc())}
✅ New ingestion job started successfully!
Ingestion Job Response: None
⏳ Waiting for ingestion job to complete...
❌ Error starting ingestion job: 'NoneType' object is not subscriptable


TypeError: 'NoneType' object is not subscriptable

In [None]:
# Helper function to check ingestion job status
def check_ingestion_status(kb_id, data_source_id=None, job_id=None):
    """Check the status of ingestion jobs for a knowledge base"""
    try:
        # If no data_source_id provided, get the first one
        if not data_source_id:
            data_sources = bedrock_agent_client.list_data_sources(
                knowledgeBaseId=kb_id,
                maxResults=10
            )
            if data_sources['dataSourceSummaries']:
                data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
            else:
                print("❌ No data sources found for this knowledge base")
                return
        
        # List ingestion jobs
        ingestion_jobs = bedrock_agent_client.list_ingestion_jobs(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id,
            maxResults=5
        )
        
        print(f"📋 Recent ingestion jobs for Knowledge Base {kb_id}:")
        print("=" * 80)
        
        for i, job in enumerate(ingestion_jobs['ingestionJobSummaries'][:3]):
            status_emoji = {
                'COMPLETE': '✅',
                'IN_PROGRESS': '⏳', 
                'FAILED': '❌',
                'STARTING': '🔄'
            }.get(job['status'], '❓')
            
            print(f"{i+1}. Job ID: {job['ingestionJobId']}")
            print(f"   Status: {status_emoji} {job['status']}")
            print(f"   Started: {job['startedAt']}")
            print(f"   Updated: {job['updatedAt']}")
            print()
            
    except Exception as e:
        print(f"❌ Error checking ingestion status: {str(e)}")

# Check current ingestion status
print("🔍 Checking current ingestion job status...")
check_ingestion_status(kb_id)


In [None]:
# Fix the IAM service role permissions and retry ingestion
print("🔧 Checking and fixing IAM service role permissions...")

# Get the current service role
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name
role_arn = f"arn:aws:iam::{account_id}:role/{bedrock_role_name}"

print(f"Service Role: {bedrock_role_name}")
print(f"Service Role ARN: {role_arn}")

# Create a comprehensive policy that matches AWS documentation requirements
enhanced_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "RedshiftDataAPIStatementPermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:GetStatementResult",
                "redshift-data:DescribeStatement", 
                "redshift-data:CancelStatement"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "redshift-data:statement-owner-iam-userid": "${aws:userid}"
                }
            }
        },
        {
            "Sid": "RedshiftDataAPIExecutePermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:ExecuteStatement"
            ],
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "RedshiftServerlessGetCredentials",
            "Effect": "Allow",
            "Action": "redshift-serverless:GetCredentials",
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "SqlWorkbenchAccess",
            "Effect": "Allow",
            "Action": [
                "sqlworkbench:GetSqlRecommendations",
                "sqlworkbench:PutSqlGenerationContext",
                "sqlworkbench:GetSqlGenerationContext", 
                "sqlworkbench:DeleteSqlGenerationContext"
            ],
            "Resource": "*"
        },
        {
            "Sid": "BedrockAccess",
            "Effect": "Allow",
            "Action": [
                "bedrock:GenerateQuery",
                "bedrock:InvokeModel",
                "bedrock:Retrieve",
                "bedrock:RetrieveAndGenerate"
            ],
            "Resource": "*"
        },
        {
            "Sid": "LoggingPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

# Create and attach the enhanced policy
enhanced_policy_name = f'EnhancedBedrockRedshiftPolicy-{suffix}'

try:
    # Create the enhanced policy
    enhanced_policy_response = iam_client.create_policy(
        PolicyName=enhanced_policy_name,
        PolicyDocument=json.dumps(enhanced_policy_document),
        Description='Enhanced policy for Bedrock Knowledge Base with Redshift access'
    )
    enhanced_policy_arn = enhanced_policy_response['Policy']['Arn']
    print(f"✅ Created enhanced policy: {enhanced_policy_name}")
    
    # Attach the enhanced policy to the role
    iam_client.attach_role_policy(
        RoleName=bedrock_role_name,
        PolicyArn=enhanced_policy_arn
    )
    print(f"✅ Attached enhanced policy to role: {bedrock_role_name}")
    
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"ℹ️ Enhanced policy {enhanced_policy_name} already exists")
    enhanced_policy_arn = f"arn:aws:iam::{account_id}:policy/{enhanced_policy_name}"
    try:
        iam_client.attach_role_policy(
            RoleName=bedrock_role_name,
            PolicyArn=enhanced_policy_arn
        )
        print(f"✅ Attached existing enhanced policy to role")
    except iam_client.exceptions.EntityAlreadyExistsException:
        print(f"ℹ️ Enhanced policy already attached to role")
except Exception as e:
    print(f"❌ Error creating/attaching enhanced policy: {str(e)}")

print("\\n🔄 Waiting 30 seconds for IAM changes to propagate...")
import time
time.sleep(30)


🔧 Checking and fixing IAM service role permissions...
Service Role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Service Role ARN: arn:aws:iam::533267284022:role/AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
✅ Created enhanced policy: EnhancedBedrockRedshiftPolicy-0205647
✅ Attached enhanced policy to role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
\n🔄 Waiting 30 seconds for IAM changes to propagate...


In [None]:
# Start ingestion job directly using Bedrock Agent client
print("🚀 Starting ingestion job with enhanced permissions...")

try:
    # Get the data source ID
    data_sources = bedrock_agent_client.list_data_sources(
        knowledgeBaseId=kb_id,
        maxResults=10
    )
    
    if not data_sources['dataSourceSummaries']:
        print("❌ No data sources found for this knowledge base")
    else:
        data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
        print(f"📋 Using data source ID: {data_source_id}")
        
        # Start the ingestion job directly
        start_job_response = bedrock_agent_client.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id
        )
        
        job = start_job_response["ingestionJob"]
        print(f"✅ Ingestion job started successfully!")
        print(f"📊 Job ID: {job['ingestionJobId']}")
        print(f"📊 Initial Status: {job['status']}")
        
        # Monitor the job status
        job_id = job['ingestionJobId']
        max_attempts = 20
        wait_time = 15
        
        print(f"\\n⏳ Monitoring ingestion job progress...")
        
        for attempt in range(max_attempts):
            try:
                get_job_response = bedrock_agent_client.get_ingestion_job(
                    knowledgeBaseId=kb_id,
                    dataSourceId=data_source_id,
                    ingestionJobId=job_id
                )
                
                current_job = get_job_response["ingestionJob"]
                status = current_job['status']
                
                status_emoji = {
                    'COMPLETE': '✅',
                    'IN_PROGRESS': '⏳',
                    'STARTING': '🔄', 
                    'FAILED': '❌',
                    'STOPPED': '⏹️'
                }.get(status, '❓')
                
                print(f"Attempt {attempt + 1}/{max_attempts}: {status_emoji} Status: {status}")
                
                if status == 'COMPLETE':
                    print("\\n🎉 Ingestion job completed successfully!")
                    print("🎯 Knowledge Base is now ready to answer queries!")
                    break
                elif status == 'FAILED':
                    print("\\n❌ Ingestion job failed!")
                    if 'failureReasons' in current_job:
                        print(f"Failure reasons: {current_job['failureReasons']}")
                    if 'statistics' in current_job:
                        print(f"Statistics: {current_job['statistics']}")
                    break
                elif status == 'STOPPED':
                    print("\\n⏹️ Ingestion job was stopped!")
                    break
                elif status in ['IN_PROGRESS', 'STARTING']:
                    if attempt < max_attempts - 1:
                        print(f"   ⏳ Waiting {wait_time} seconds before next check...")
                        time.sleep(wait_time)
                    else:
                        print("\\n⏰ Timeout reached. Job may still be running in background.")
                        print("Use the check_ingestion_status function to monitor progress.")
                
            except Exception as e:
                print(f"❌ Error checking job status: {str(e)}")
                if attempt < max_attempts - 1:
                    print(f"   🔄 Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print("\\n❌ Maximum retries reached for status check")
                    break
                
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🚀 Starting ingestion job with enhanced permissions...
📋 Using data source ID: EOZFBUZD6F
✅ Ingestion job started successfully!
📊 Job ID: JKTRBFLDGT
📊 Initial Status: STARTING
\n⏳ Monitoring ingestion job progress...
Attempt 1/20: ⏳ Status: IN_PROGRESS
   ⏳ Waiting 15 seconds before next check...
Attempt 2/20: ❌ Status: FAILED
\n❌ Ingestion job failed!


In [None]:
# Test the Knowledge Base once ingestion is successful
def test_knowledge_base():
    """Test the Knowledge Base with sample queries"""
    print("🧪 Testing Knowledge Base functionality...")
    
    # Test queries
    test_queries = [
        "How many orders are there in total?",
        "What is the average order value?", 
        "Which payment method is most popular?",
        "How many different products have been ordered?"
    ]
    
    foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
    
    for i, query in enumerate(test_queries, 1):
        print(f"\\n🔍 Test Query {i}: {query}")
        print("-" * 50)
        
        try:
            # Test with retrieve_and_generate
            response = bedrock_agent_runtime_client.retrieve_and_generate(
                input={"text": query},
                retrieveAndGenerateConfiguration={
                    "type": "KNOWLEDGE_BASE",
                    "knowledgeBaseConfiguration": {
                        'knowledgeBaseId': kb_id,
                        "modelArn": f"arn:aws:bedrock:{region}::foundation-model/{foundation_model}",
                        "retrievalConfiguration": {
                            "vectorSearchConfiguration": {
                                "numberOfResults": 5
                            } 
                        }
                    }
                }
            )
            
            print(f"✅ Response: {response['output']['text']}")
            
        except Exception as e:
            print(f"❌ Error: {str(e)}")
            
        print()

# Note: Only run this after ingestion completes successfully
print("📝 Note: Use test_knowledge_base() function after ingestion completes")
print("Example: test_knowledge_base()")


📝 Note: Use test_knowledge_base() function after ingestion completes
Example: test_knowledge_base()


In [None]:
def wait_for_statement(statement_id):
    """Wait for a Redshift Data API statement to complete"""
    max_attempts = 30
    for attempt in range(max_attempts):
        try:
            response = redshift_data_client.describe_statement(Id=statement_id)
            status = response['Status']
            if status == 'FINISHED':
                return response
            elif status == 'FAILED':
                raise Exception(f"Statement failed: {response.get('Error', 'Unknown error')}")
            elif status == 'CANCELLED':
                raise Exception("Statement was cancelled")
            else:
                print(f"Statement status: {status}, waiting...")
                time.sleep(5)
        except Exception as e:
            if "Statement failed" in str(e) or "cancelled" in str(e):
                raise
            print(f"Error checking statement status: {str(e)}, retrying...")
            time.sleep(5)
    
    raise Exception("Timeout waiting for statement to complete")

def run_redshift_statement(sql_statement):
    """Execute a SQL statement in Redshift"""
    try:
        response = redshift_data_client.execute_statement(
            WorkgroupName=REDSHIFT_WORKGROUP,
            Database=REDSHIFT_DATABASE,
            Sql=sql_statement
        )
        statement_id = response['Id']
        print(f"Executing statement: {statement_id}")
        result = wait_for_statement(statement_id)
        print(f"Statement completed successfully")
        return result
    except Exception as e:
        print(f"Error executing statement: {str(e)}")
        raise


In [None]:
# Create tables in Redshift
def create_tables():
    """Create all necessary tables in Redshift"""
    
    # Orders table
    orders_sql = """
    CREATE TABLE IF NOT EXISTS orders (
        order_id VARCHAR(255) PRIMARY KEY,
        customer_id VARCHAR(255),
        order_total DECIMAL(10,2),
        order_status VARCHAR(50),
        payment_method VARCHAR(50),
        shipping_address TEXT,
        created_at TIMESTAMP,
        updated_at TIMESTAMP
    );
    """
    
    # Order Items table
    order_items_sql = """
    CREATE TABLE IF NOT EXISTS order_items (
        order_item_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        product_id VARCHAR(255),
        quantity INTEGER,
        price DECIMAL(10,2)
    );
    """
    
    # Payments table
    payments_sql = """
    CREATE TABLE IF NOT EXISTS payments (
        payment_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        customer_id VARCHAR(255),
        amount DECIMAL(10,2),
        payment_method VARCHAR(50),
        payment_status VARCHAR(50),
        created_at DATE
    );
    """
    
    # Reviews table
    reviews_sql = """
    CREATE TABLE IF NOT EXISTS reviews (
        review_id VARCHAR(255) PRIMARY KEY,
        product_id VARCHAR(255),
        customer_id VARCHAR(255),
        rating INTEGER,
        created_at DATE
    );
    """
    
    tables = {
        'orders': orders_sql,
        'order_items': order_items_sql,
        'payments': payments_sql,
        'reviews': reviews_sql
    }
    
    for table_name, sql in tables.items():
        print(f"Creating table: {table_name}")
        run_redshift_statement(sql)
        print(f"Created table: {table_name}")
        print("-------------")

# Create tables
create_tables()


Creating table: orders
Executing statement: 4d669868-ea32-4ce0-9180-c977ebc99514
Statement status: PICKED, waiting...
Statement completed successfully
Created table: orders
-------------
Creating table: order_items
Executing statement: ea378983-2fac-43f2-b3ce-9c8d016ef05d
Statement status: PICKED, waiting...
Statement completed successfully
Created table: order_items
-------------
Creating table: payments
Executing statement: a75c6c7a-9c38-4b72-8c74-82563692520b
Statement status: SUBMITTED, waiting...
Statement completed successfully
Created table: payments
-------------
Creating table: reviews
Executing statement: e227ecbd-4eb6-4a0b-a9d9-f41049822560
Statement status: PICKED, waiting...
Statement completed successfully
Created table: reviews
-------------


In [None]:
# Load data from S3 into Redshift tables
def load_data_from_s3():
    """Load data from S3 CSV files into Redshift tables"""
    
    tables_and_files = {
        'orders': 'orders.csv',
        'order_items': 'order_items.csv',
        'payments': 'payments.csv',
        'reviews': 'reviews.csv'
    }
    
    for table_name, file_name in tables_and_files.items():
        print(f"Loading data into {table_name} from {file_name}")
        
        copy_sql = f"""
        COPY {table_name}
        FROM 's3://{S3_BUCKET}/{file_name}'
        IAM_ROLE '{redshift_role_arn}'
        CSV
        IGNOREHEADER 1
        DELIMITER ','
        REGION '{region}';
        """
        
        try:
            run_redshift_statement(copy_sql)
            print(f"Loaded data into {table_name}")
        except Exception as e:
            print(f"Error loading data into {table_name}: {str(e)}")

# Load data from S3
load_data_from_s3()


Loading data into orders from orders.csv
Executing statement: 0a23526b-aa4a-4d34-9458-a6d0f8c0433a
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into orders
Loading data into order_items from order_items.csv
Executing statement: 84d54756-be6c-43c2-9bb8-4f47f5657c37
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into order_items
Loading data into payments from payments.csv
Executing statement: 8e48260c-1a43-45c8-a626-4ca55af5866c
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into payments
Loading data into reviews from reviews.csv
Executing statement: 6ba97e21-83cd-4e22-843c-068c0bfeb201
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into reviews


In [None]:
# Knowledge Base Configuration for IAM Role + Redshift Serverless WorkGroup
kbConfigParam = {
    "type": "SQL",
    "sqlKnowledgeBaseConfiguration": {
        "type": "REDSHIFT",
        "redshiftConfiguration": {
            "storageConfigurations": [{
                "type": "REDSHIFT",
                "redshiftConfiguration": {
                    "databaseName": REDSHIFT_DATABASE
                }
            }],
            "queryEngineConfiguration": {
                "type": "SERVERLESS",
                "serverlessConfiguration": {
                    "workgroupArn": workgroup_arn,
                    "authConfiguration": {
                        "type": "IAM"
                    }
                }
            }
        }
    }
}


In [None]:
# Create the Knowledge Base using IAM Role + Redshift Serverless WorkGroup
print("Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...")

knowledge_base = BedrockStructuredKnowledgeBase(
    kb_name=knowledge_base_name,
    kb_description=knowledge_base_description,
    workgroup_arn=workgroup_arn,
    kbConfigParam=kbConfigParam,
    generation_model=foundation_model,
    suffix=suffix
)

print("Knowledge Base created successfully!")
print(f"Knowledge Base ID: {knowledge_base.get_knowledge_base_id()}")


[2025-06-20 21:04:12,179] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...


[2025-06-20 21:04:12,950] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Step 1 - Creating Knowledge Base Execution Role (AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647) and Policies
Step 2 - Creating Knowledge Base
{ 'createdAt': datetime.datetime(2025, 6, 21, 4, 4, 13, 800686, tzinfo=tzutc()),
  'description': 'Sample Structured KB',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:533267284022:knowledge-base/WCNCTKFCKY',
  'knowledgeBaseConfiguration': { 'sqlKnowledgeBaseConfiguration': { 'redshiftConfiguration': { 'queryEngineConfiguration': { 'serverlessConfiguration': { 'authConfiguration': { 'type': 'IAM'},
                                                                                                                                                           'workgroupArn': 'arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa'},
                                                                                                                              'type': 'SERVERLESS'},
               

In [None]:
# Get the Bedrock execution role name
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Display the SQL commands that need to be executed in Redshift Query Editor
print("Execute the following SQL commands in your Redshift Query Editor:")
print("=" * 70)
print(f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;')
print(f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";')
print(f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";')
print("=" * 70)
print("\\nThese commands will:")
print("1. Create an IAM-based database user for the Bedrock execution role")
print("2. Grant USAGE permission on the public schema")
print("3. Grant SELECT permission on all tables in the public schema")
print("\\nNote: Adjust the schema and table permissions based on your specific requirements.")


Execute the following SQL commands in your Redshift Query Editor:
CREATE USER "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" WITH PASSWORD DISABLE;
GRANT USAGE ON SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
\nThese commands will:
1. Create an IAM-based database user for the Bedrock execution role
2. Grant USAGE permission on the public schema
3. Grant SELECT permission on all tables in the public schema
\nNote: Adjust the schema and table permissions based on your specific requirements.


In [None]:
# ensure that the kb is available
time.sleep(60)
# sync knowledge base
knowledge_base.start_ingestion_job()
# keep the kb_id for invocation later in the invoke request
kb_id = knowledge_base.get_knowledge_base_id()
# %store kb_id


job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'HOYDC9A0DV',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 4, 5, 39, 955341, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 4, 5, 43, 167059, tzinfo=tzutc())}
'WCNCTKFCKY'


In [None]:
query = "How many orders are there in total?"


In [None]:
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults": 5
                } 
            }
        }
    }
)

print(response['output']['text'], end='\n'*2)


ValidationException: An error occurred (ValidationException) when calling the RetrieveAndGenerate operation: No metadata found. Please trigger an ingestion and try again. (Service: BedrockAgentRuntime, Status Code: 400, Request ID: 4846aded-b9bf-403d-ad46-00cc9556ccc6) (SDK Attempt Count: 1)

In [None]:
response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults": 5,
        } 
    },
    retrievalQuery={
        "text": query
    }
)


In [None]:
import json
import pandas as pd

# Function to extract retrieved results from Retrieve API response into a pandas dataframe.
def response_print(retrieve_resp):
    # Extract the retrievalResults list
    retrieval_results = retrieve_resp['retrievalResults']

    # Dictionary to store the extracted data
    extracted_data = {}

    # Iterate through each item in retrievalResults
    for item in retrieval_results:
        row = item['content']['row']
        for col in row:
            column_name = col['columnName']
            column_value = col['columnValue']
            
            # If this column hasn't been seen before, create a new list for it
            if column_name not in extracted_data:
                extracted_data[column_name] = []
            
            # Append the value to the appropriate list
            extracted_data[column_name].append(column_value)

    # Create a DataFrame from the extracted data
    df = pd.DataFrame(extracted_data)
    return df
    
# Display the Retrieved results records
df = response_print(response_ret)
print(df.head())


In [None]:
query_response = bedrock_agent_runtime_client.generate_query(
    queryGenerationInput={
        "text": query,
        "type": "TEXT"
    },
    transformationConfiguration={
        "mode": "TEXT_TO_SQL",
        "textToSqlConfiguration": {
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseArn": knowledge_base.knowledge_base['knowledgeBaseArn']
            }
        }
    }
)

generated_sql = query_response['queries'][0]['sql']
generated_sql


In [None]:
# Delete resources
# print("=============================== Deleting resources ==============================\n")
# knowledge_base.delete_kb(delete_iam_roles_and_policies=True)


In [2]:
# %pip install --upgrade pip --quiet
# %pip install -r ../requirements.txt --no-deps --quiet
# %pip install -r ../requirements.txt --upgrade --quiet
# %pip install --upgrade boto3
import boto3
print(boto3.__version__)


1.38.36


In [3]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")


In [4]:
# %load_ext autoreload
# %autoreload 2
# import warnings
# warnings.filterwarnings('ignore')


In [5]:
import sys
import logging
from pathlib import Path

current_path = Path().resolve()
current_path = current_path.parent

if str(current_path) not in sys.path:
    sys.path.append(str(current_path))

# Print sys.path to verify
print(sys.path)

from utils.structured_knowledge_base import BedrockStructuredKnowledgeBase


['/Users/manojs/Documents/Code/samples/05-agentic-rag/2-unstructure-structured-rag_agent', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python312.zip', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/lib-dynload', '', '/opt/homebrew/Caskroom/miniforge/base/envs/genai-on-aws/lib/python3.12/site-packages', '/Users/manojs/Documents/Code/samples/05-agentic-rag']


Setup and initialize boto3 clients


In [7]:
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session(region_name='us-west-2')
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id


('us-west-2', '533267284022')

In [8]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"

knowledge_base_name = f"bedrock-sample-structured-kb-{suffix}"
knowledge_base_description = "Sample Structured KB"

foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"


In [9]:
# Configuration for Redshift resources
REDSHIFT_NAMESPACE = f'sds-ecommerce-{suffix}'
REDSHIFT_WORKGROUP = f'sds-ecommerce-wg-{suffix}'
REDSHIFT_DATABASE = f'sds-ecommerce'
S3_BUCKET = f'sds-ecommerce-redshift-{suffix}'

print(f"Redshift Namespace: {REDSHIFT_NAMESPACE}")
print(f"Redshift Workgroup: {REDSHIFT_WORKGROUP}")
print(f"Database: {REDSHIFT_DATABASE}")
print(f"S3 Bucket: {S3_BUCKET}")


Redshift Namespace: sds-ecommerce-0205647
Redshift Workgroup: sds-ecommerce-wg-0205647
Database: sds-ecommerce
S3 Bucket: sds-ecommerce-redshift-0205647


In [10]:
def create_iam_role_for_redshift():
    """Create IAM role for Redshift to access S3"""
    try:
        # Get account ID
        account_id = sts_client.get_caller_identity()['Account']
        
        # Create IAM role if it doesn't exist
        role_name = f'RedshiftS3AccessRole-{suffix}'
        try:
            role_response = iam_client.get_role(RoleName=role_name)
            print(f'Role {role_name} already exists')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
        except iam_client.exceptions.NoSuchEntityException:
            trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "redshift.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
            
            iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy)
            )
            
            # Attach necessary policies
            iam_client.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
            )
            
            print(f'Created role {role_name}')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
            
    except Exception as e:
        print(f'Error creating IAM role: {str(e)}')
        raise

# Initialize additional clients
import json
import os
iam_client = boto3.client('iam')
redshift_client = boto3.client('redshift-serverless', region_name=region)
redshift_data_client = boto3.client('redshift-data', region_name=region)

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")

redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")


Created role RedshiftS3AccessRole-0205647
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647
Role RedshiftS3AccessRole-0205647 already exists
Redshift IAM Role ARN: arn:aws:iam::533267284022:role/RedshiftS3AccessRole-0205647


In [11]:
def create_redshift_namespace():
    """Create Redshift Serverless namespace"""
    try:
        # Check if namespace already exists
        try:
            response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
            print(f'Namespace {REDSHIFT_NAMESPACE} already exists')
            return response['namespace']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating namespace {REDSHIFT_NAMESPACE}...')
        
        # Create the namespace
        response = redshift_client.create_namespace(
            namespaceName=REDSHIFT_NAMESPACE,
            adminUsername='admin',
            adminUserPassword='TempPassword123!',  # Change this in production
            dbName=REDSHIFT_DATABASE,
            defaultIamRoleArn=redshift_role_arn,
            iamRoles=[redshift_role_arn]
        )
        
        print(f'Created namespace {REDSHIFT_NAMESPACE}')
        
        # Wait for namespace to be available
        print('Waiting for namespace to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                namespace_response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
                status = namespace_response['namespace']['status']
                if status == 'AVAILABLE':
                    print(f'Namespace {REDSHIFT_NAMESPACE} is now available')
                    return namespace_response['namespace']
                else:
                    print(f'Namespace status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking namespace status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for namespace, but proceeding...')
        return response['namespace']
        
    except Exception as e:
        print(f'Error creating namespace: {str(e)}')
        raise

# Create namespace
namespace = create_redshift_namespace()


Creating namespace sds-ecommerce-0205647...
Created namespace sds-ecommerce-0205647
Waiting for namespace to be available...
Namespace sds-ecommerce-0205647 is now available


In [12]:
def create_redshift_workgroup():
    """Create Redshift Serverless workgroup"""
    try:
        # Check if workgroup already exists
        try:
            response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
            print(f'Workgroup {REDSHIFT_WORKGROUP} already exists')
            return response['workgroup']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating workgroup {REDSHIFT_WORKGROUP}...')
        
        # Create the workgroup
        response = redshift_client.create_workgroup(
            workgroupName=REDSHIFT_WORKGROUP,
            namespaceName=REDSHIFT_NAMESPACE,
            baseCapacity=8,  # Minimum base capacity
            enhancedVpcRouting=False,
            publiclyAccessible=True,
            configParameters=[
                {
                    'parameterKey': 'enable_user_activity_logging',
                    'parameterValue': 'true'
                }
            ]
        )
        
        print(f'Created workgroup {REDSHIFT_WORKGROUP}')
        
        # Wait for workgroup to be available
        print('Waiting for workgroup to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                workgroup_response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
                status = workgroup_response['workgroup']['status']
                if status == 'AVAILABLE':
                    print(f'Workgroup {REDSHIFT_WORKGROUP} is now available')
                    return workgroup_response['workgroup']
                else:
                    print(f'Workgroup status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking workgroup status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for workgroup, but proceeding...')
        return response['workgroup']
        
    except Exception as e:
        print(f'Error creating workgroup: {str(e)}')
        raise

# Create workgroup
workgroup = create_redshift_workgroup()
workgroup_arn = workgroup['workgroupArn']
print(f"Workgroup ARN: {workgroup_arn}")


Creating workgroup sds-ecommerce-wg-0205647...
Created workgroup sds-ecommerce-wg-0205647
Waiting for workgroup to be available...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup status: CREATING, waiting...
Workgroup sds-ecommerce-wg-0205647 is now available
Workgroup ARN: arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa


## 2 - Create S3 Bucket and Load Sample Data

We will create an S3 bucket to stage our sample e-commerce data before loading it into Redshift tables.


In [13]:
def create_s3_bucket():
    """Create S3 bucket for data staging"""
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        print(f'Bucket {S3_BUCKET} already exists')
    except:
        try:
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=S3_BUCKET)
            else:
                s3_client.create_bucket(
                    Bucket=S3_BUCKET,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
            print(f'Created bucket {S3_BUCKET}')
        except Exception as e:
            print(f'Error creating bucket: {str(e)}')
            raise

# Create S3 bucket
create_s3_bucket()


Created bucket sds-ecommerce-redshift-0205647


In [14]:
def upload_sample_data():
    """Upload sample CSV files to S3"""
    data_files = ['orders.csv', 'order_items.csv', 'payments.csv', 'reviews.csv']
    sds_directory = 'sample_data'
    
    print("Uploading sample data files to S3...")
    files_found = 0
    
    for file_name in data_files:
        local_path = os.path.join(sds_directory, file_name)
        if os.path.exists(local_path):
            # Get file size for informational purposes
            file_size = os.path.getsize(local_path)
            file_size_mb = file_size / (1024 * 1024)
            
            s3_client.upload_file(local_path, S3_BUCKET, file_name)
            print(f'Uploaded {file_name} ({file_size_mb:.1f} MB) to S3')
            files_found += 1
        else:
            print(f'Warning: {local_path} not found')
    
    if files_found == len(data_files):
        print(f"Successfully uploaded all {files_found} data files to S3")
    else:
        print(f"Only {files_found} out of {len(data_files)} files were found and uploaded")

# Upload sample data
upload_sample_data()


Uploading sample data files to S3...
Uploaded orders.csv (1.8 MB) to S3
Uploaded order_items.csv (1.3 MB) to S3
Uploaded payments.csv (0.8 MB) to S3
Uploaded reviews.csv (0.5 MB) to S3
Successfully uploaded all 4 data files to S3


In [24]:
# Execute the required SQL commands to grant database access to Bedrock execution role
print("🔧 Executing SQL commands to grant database access to Bedrock execution role...")

bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Step 1: Create the IAM user in Redshift database
create_user_sql = f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;'

try:
    print(f"Creating user: IAMR:{bedrock_role_name}")
    run_redshift_statement(create_user_sql)
    print("✅ IAM user created successfully!")
except Exception as e:
    if "already exists" in str(e).lower():
        print("ℹ️ User already exists, continuing...")
    else:
        print(f"❌ Error creating user: {str(e)}")
        raise

# Step 2: Grant USAGE permission on public schema
grant_usage_sql = f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting USAGE on schema public to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_usage_sql)
    print("✅ USAGE permission granted successfully!")
except Exception as e:
    print(f"❌ Error granting USAGE permission: {str(e)}")
    raise

# Step 3: Grant SELECT permission on all tables in public schema
grant_select_sql = f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting SELECT on all tables to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_select_sql)
    print("✅ SELECT permissions granted successfully!")
except Exception as e:
    print(f"❌ Error granting SELECT permissions: {str(e)}")
    raise

print("\\n🎉 All database permissions have been granted to the Bedrock execution role!")
print("The Knowledge Base should now be able to access the database tables.")


🔧 Executing SQL commands to grant database access to Bedrock execution role...
Creating user: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: cb83ac9a-aa81-41ad-8bde-91ceb92cfc0c
Statement status: SUBMITTED, waiting...
Statement status: STARTED, waiting...
Statement status: STARTED, waiting...
Error executing statement: Statement failed: ERROR: user "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" already exists
ℹ️ User already exists, continuing...
Granting USAGE on schema public to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: c948acbc-0201-440d-b2e1-fee3521a65ac
Statement status: PICKED, waiting...
Statement completed successfully
✅ USAGE permission granted successfully!
Granting SELECT on all tables to: IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Executing statement: 6c413ca2-d25d-41bb-ae99-d28f3506ee19
Statement status: PICKED, waiting...
Statement completed su

In [25]:
# Retry the ingestion job now that permissions are fixed
print("🔄 Retrying the ingestion job with proper permissions...")

try:
    # Start a new ingestion job
    response = knowledge_base.start_ingestion_job()
    print("✅ New ingestion job started successfully!")
    print(f"Ingestion Job Response: {response}")
    
    # Wait a bit for the job to process
    import time
    print("⏳ Waiting for ingestion job to complete...")
    time.sleep(30)
    
    # Check the ingestion job status
    job_id = response['ingestionJobId']
    job_status = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=response['dataSourceId'],
        ingestionJobId=job_id
    )
    
    print(f"\\n📊 Ingestion Job Status: {job_status['ingestionJob']['status']}")
    if job_status['ingestionJob']['status'] == 'COMPLETE':
        print("🎉 Ingestion job completed successfully!")
    elif job_status['ingestionJob']['status'] == 'IN_PROGRESS':
        print("⏳ Ingestion job is still in progress. Wait a few more minutes.")
    else:
        print(f"❌ Ingestion job status: {job_status['ingestionJob']['status']}")
        if 'failureReasons' in job_status['ingestionJob']:
            print(f"Failure reasons: {job_status['ingestionJob']['failureReasons']}")
            
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🔄 Retrying the ingestion job with proper permissions...
job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'YBQXYQU4WL',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 7, 12, 41, 597571, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 7, 12, 44, 860633, tzinfo=tzutc())}
✅ New ingestion job started successfully!
Ingestion Job Response: None
⏳ Waiting for ingestion job to complete...
❌ Error starting ingestion job: 'NoneType' object is not subscriptable


TypeError: 'NoneType' object is not subscriptable

In [None]:
# Helper function to check ingestion job status
def check_ingestion_status(kb_id, data_source_id=None, job_id=None):
    """Check the status of ingestion jobs for a knowledge base"""
    try:
        # If no data_source_id provided, get the first one
        if not data_source_id:
            data_sources = bedrock_agent_client.list_data_sources(
                knowledgeBaseId=kb_id,
                maxResults=10
            )
            if data_sources['dataSourceSummaries']:
                data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
            else:
                print("❌ No data sources found for this knowledge base")
                return
        
        # List ingestion jobs
        ingestion_jobs = bedrock_agent_client.list_ingestion_jobs(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id,
            maxResults=5
        )
        
        print(f"📋 Recent ingestion jobs for Knowledge Base {kb_id}:")
        print("=" * 80)
        
        for i, job in enumerate(ingestion_jobs['ingestionJobSummaries'][:3]):
            status_emoji = {
                'COMPLETE': '✅',
                'IN_PROGRESS': '⏳', 
                'FAILED': '❌',
                'STARTING': '🔄'
            }.get(job['status'], '❓')
            
            print(f"{i+1}. Job ID: {job['ingestionJobId']}")
            print(f"   Status: {status_emoji} {job['status']}")
            print(f"   Started: {job['startedAt']}")
            print(f"   Updated: {job['updatedAt']}")
            print()
            
    except Exception as e:
        print(f"❌ Error checking ingestion status: {str(e)}")

# Check current ingestion status
print("🔍 Checking current ingestion job status...")
check_ingestion_status(kb_id)


In [26]:
# Fix the IAM service role permissions and retry ingestion
print("🔧 Checking and fixing IAM service role permissions...")

# Get the current service role
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name
role_arn = f"arn:aws:iam::{account_id}:role/{bedrock_role_name}"

print(f"Service Role: {bedrock_role_name}")
print(f"Service Role ARN: {role_arn}")

# Create a comprehensive policy that matches AWS documentation requirements
enhanced_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "RedshiftDataAPIStatementPermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:GetStatementResult",
                "redshift-data:DescribeStatement", 
                "redshift-data:CancelStatement"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "redshift-data:statement-owner-iam-userid": "${aws:userid}"
                }
            }
        },
        {
            "Sid": "RedshiftDataAPIExecutePermissions",
            "Effect": "Allow",
            "Action": [
                "redshift-data:ExecuteStatement"
            ],
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "RedshiftServerlessGetCredentials",
            "Effect": "Allow",
            "Action": "redshift-serverless:GetCredentials",
            "Resource": [
                workgroup_arn
            ]
        },
        {
            "Sid": "SqlWorkbenchAccess",
            "Effect": "Allow",
            "Action": [
                "sqlworkbench:GetSqlRecommendations",
                "sqlworkbench:PutSqlGenerationContext",
                "sqlworkbench:GetSqlGenerationContext", 
                "sqlworkbench:DeleteSqlGenerationContext"
            ],
            "Resource": "*"
        },
        {
            "Sid": "BedrockAccess",
            "Effect": "Allow",
            "Action": [
                "bedrock:GenerateQuery",
                "bedrock:InvokeModel",
                "bedrock:Retrieve",
                "bedrock:RetrieveAndGenerate"
            ],
            "Resource": "*"
        },
        {
            "Sid": "LoggingPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

# Create and attach the enhanced policy
enhanced_policy_name = f'EnhancedBedrockRedshiftPolicy-{suffix}'

try:
    # Create the enhanced policy
    enhanced_policy_response = iam_client.create_policy(
        PolicyName=enhanced_policy_name,
        PolicyDocument=json.dumps(enhanced_policy_document),
        Description='Enhanced policy for Bedrock Knowledge Base with Redshift access'
    )
    enhanced_policy_arn = enhanced_policy_response['Policy']['Arn']
    print(f"✅ Created enhanced policy: {enhanced_policy_name}")
    
    # Attach the enhanced policy to the role
    iam_client.attach_role_policy(
        RoleName=bedrock_role_name,
        PolicyArn=enhanced_policy_arn
    )
    print(f"✅ Attached enhanced policy to role: {bedrock_role_name}")
    
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"ℹ️ Enhanced policy {enhanced_policy_name} already exists")
    enhanced_policy_arn = f"arn:aws:iam::{account_id}:policy/{enhanced_policy_name}"
    try:
        iam_client.attach_role_policy(
            RoleName=bedrock_role_name,
            PolicyArn=enhanced_policy_arn
        )
        print(f"✅ Attached existing enhanced policy to role")
    except iam_client.exceptions.EntityAlreadyExistsException:
        print(f"ℹ️ Enhanced policy already attached to role")
except Exception as e:
    print(f"❌ Error creating/attaching enhanced policy: {str(e)}")

print("\\n🔄 Waiting 30 seconds for IAM changes to propagate...")
import time
time.sleep(30)


🔧 Checking and fixing IAM service role permissions...
Service Role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
Service Role ARN: arn:aws:iam::533267284022:role/AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
✅ Created enhanced policy: EnhancedBedrockRedshiftPolicy-0205647
✅ Attached enhanced policy to role: AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647
\n🔄 Waiting 30 seconds for IAM changes to propagate...


In [27]:
# Start ingestion job directly using Bedrock Agent client
print("🚀 Starting ingestion job with enhanced permissions...")

try:
    # Get the data source ID
    data_sources = bedrock_agent_client.list_data_sources(
        knowledgeBaseId=kb_id,
        maxResults=10
    )
    
    if not data_sources['dataSourceSummaries']:
        print("❌ No data sources found for this knowledge base")
    else:
        data_source_id = data_sources['dataSourceSummaries'][0]['dataSourceId']
        print(f"📋 Using data source ID: {data_source_id}")
        
        # Start the ingestion job directly
        start_job_response = bedrock_agent_client.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id
        )
        
        job = start_job_response["ingestionJob"]
        print(f"✅ Ingestion job started successfully!")
        print(f"📊 Job ID: {job['ingestionJobId']}")
        print(f"📊 Initial Status: {job['status']}")
        
        # Monitor the job status
        job_id = job['ingestionJobId']
        max_attempts = 20
        wait_time = 15
        
        print(f"\\n⏳ Monitoring ingestion job progress...")
        
        for attempt in range(max_attempts):
            try:
                get_job_response = bedrock_agent_client.get_ingestion_job(
                    knowledgeBaseId=kb_id,
                    dataSourceId=data_source_id,
                    ingestionJobId=job_id
                )
                
                current_job = get_job_response["ingestionJob"]
                status = current_job['status']
                
                status_emoji = {
                    'COMPLETE': '✅',
                    'IN_PROGRESS': '⏳',
                    'STARTING': '🔄', 
                    'FAILED': '❌',
                    'STOPPED': '⏹️'
                }.get(status, '❓')
                
                print(f"Attempt {attempt + 1}/{max_attempts}: {status_emoji} Status: {status}")
                
                if status == 'COMPLETE':
                    print("\\n🎉 Ingestion job completed successfully!")
                    print("🎯 Knowledge Base is now ready to answer queries!")
                    break
                elif status == 'FAILED':
                    print("\\n❌ Ingestion job failed!")
                    if 'failureReasons' in current_job:
                        print(f"Failure reasons: {current_job['failureReasons']}")
                    if 'statistics' in current_job:
                        print(f"Statistics: {current_job['statistics']}")
                    break
                elif status == 'STOPPED':
                    print("\\n⏹️ Ingestion job was stopped!")
                    break
                elif status in ['IN_PROGRESS', 'STARTING']:
                    if attempt < max_attempts - 1:
                        print(f"   ⏳ Waiting {wait_time} seconds before next check...")
                        time.sleep(wait_time)
                    else:
                        print("\\n⏰ Timeout reached. Job may still be running in background.")
                        print("Use the check_ingestion_status function to monitor progress.")
                
            except Exception as e:
                print(f"❌ Error checking job status: {str(e)}")
                if attempt < max_attempts - 1:
                    print(f"   🔄 Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print("\\n❌ Maximum retries reached for status check")
                    break
                
except Exception as e:
    print(f"❌ Error starting ingestion job: {str(e)}")
    raise


🚀 Starting ingestion job with enhanced permissions...
📋 Using data source ID: EOZFBUZD6F
✅ Ingestion job started successfully!
📊 Job ID: JKTRBFLDGT
📊 Initial Status: STARTING
\n⏳ Monitoring ingestion job progress...
Attempt 1/20: ⏳ Status: IN_PROGRESS
   ⏳ Waiting 15 seconds before next check...
Attempt 2/20: ❌ Status: FAILED
\n❌ Ingestion job failed!


In [28]:
# Test the Knowledge Base once ingestion is successful
def test_knowledge_base():
    """Test the Knowledge Base with sample queries"""
    print("🧪 Testing Knowledge Base functionality...")
    
    # Test queries
    test_queries = [
        "How many orders are there in total?",
        "What is the average order value?", 
        "Which payment method is most popular?",
        "How many different products have been ordered?"
    ]
    
    foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
    
    for i, query in enumerate(test_queries, 1):
        print(f"\\n🔍 Test Query {i}: {query}")
        print("-" * 50)
        
        try:
            # Test with retrieve_and_generate
            response = bedrock_agent_runtime_client.retrieve_and_generate(
                input={"text": query},
                retrieveAndGenerateConfiguration={
                    "type": "KNOWLEDGE_BASE",
                    "knowledgeBaseConfiguration": {
                        'knowledgeBaseId': kb_id,
                        "modelArn": f"arn:aws:bedrock:{region}::foundation-model/{foundation_model}",
                        "retrievalConfiguration": {
                            "vectorSearchConfiguration": {
                                "numberOfResults": 5
                            } 
                        }
                    }
                }
            )
            
            print(f"✅ Response: {response['output']['text']}")
            
        except Exception as e:
            print(f"❌ Error: {str(e)}")
            
        print()

# Note: Only run this after ingestion completes successfully
print("📝 Note: Use test_knowledge_base() function after ingestion completes")
print("Example: test_knowledge_base()")


📝 Note: Use test_knowledge_base() function after ingestion completes
Example: test_knowledge_base()


In [15]:
def wait_for_statement(statement_id):
    """Wait for a Redshift Data API statement to complete"""
    max_attempts = 30
    for attempt in range(max_attempts):
        try:
            response = redshift_data_client.describe_statement(Id=statement_id)
            status = response['Status']
            if status == 'FINISHED':
                return response
            elif status == 'FAILED':
                raise Exception(f"Statement failed: {response.get('Error', 'Unknown error')}")
            elif status == 'CANCELLED':
                raise Exception("Statement was cancelled")
            else:
                print(f"Statement status: {status}, waiting...")
                time.sleep(5)
        except Exception as e:
            if "Statement failed" in str(e) or "cancelled" in str(e):
                raise
            print(f"Error checking statement status: {str(e)}, retrying...")
            time.sleep(5)
    
    raise Exception("Timeout waiting for statement to complete")

def run_redshift_statement(sql_statement):
    """Execute a SQL statement in Redshift"""
    try:
        response = redshift_data_client.execute_statement(
            WorkgroupName=REDSHIFT_WORKGROUP,
            Database=REDSHIFT_DATABASE,
            Sql=sql_statement
        )
        statement_id = response['Id']
        print(f"Executing statement: {statement_id}")
        result = wait_for_statement(statement_id)
        print(f"Statement completed successfully")
        return result
    except Exception as e:
        print(f"Error executing statement: {str(e)}")
        raise


In [16]:
# Create tables in Redshift
def create_tables():
    """Create all necessary tables in Redshift"""
    
    # Orders table
    orders_sql = """
    CREATE TABLE IF NOT EXISTS orders (
        order_id VARCHAR(255) PRIMARY KEY,
        customer_id VARCHAR(255),
        order_total DECIMAL(10,2),
        order_status VARCHAR(50),
        payment_method VARCHAR(50),
        shipping_address TEXT,
        created_at TIMESTAMP,
        updated_at TIMESTAMP
    );
    """
    
    # Order Items table
    order_items_sql = """
    CREATE TABLE IF NOT EXISTS order_items (
        order_item_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        product_id VARCHAR(255),
        quantity INTEGER,
        price DECIMAL(10,2)
    );
    """
    
    # Payments table
    payments_sql = """
    CREATE TABLE IF NOT EXISTS payments (
        payment_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        customer_id VARCHAR(255),
        amount DECIMAL(10,2),
        payment_method VARCHAR(50),
        payment_status VARCHAR(50),
        created_at DATE
    );
    """
    
    # Reviews table
    reviews_sql = """
    CREATE TABLE IF NOT EXISTS reviews (
        review_id VARCHAR(255) PRIMARY KEY,
        product_id VARCHAR(255),
        customer_id VARCHAR(255),
        rating INTEGER,
        created_at DATE
    );
    """
    
    tables = {
        'orders': orders_sql,
        'order_items': order_items_sql,
        'payments': payments_sql,
        'reviews': reviews_sql
    }
    
    for table_name, sql in tables.items():
        print(f"Creating table: {table_name}")
        run_redshift_statement(sql)
        print(f"Created table: {table_name}")
        print("-------------")

# Create tables
create_tables()


Creating table: orders
Executing statement: 4d669868-ea32-4ce0-9180-c977ebc99514
Statement status: PICKED, waiting...
Statement completed successfully
Created table: orders
-------------
Creating table: order_items
Executing statement: ea378983-2fac-43f2-b3ce-9c8d016ef05d
Statement status: PICKED, waiting...
Statement completed successfully
Created table: order_items
-------------
Creating table: payments
Executing statement: a75c6c7a-9c38-4b72-8c74-82563692520b
Statement status: SUBMITTED, waiting...
Statement completed successfully
Created table: payments
-------------
Creating table: reviews
Executing statement: e227ecbd-4eb6-4a0b-a9d9-f41049822560
Statement status: PICKED, waiting...
Statement completed successfully
Created table: reviews
-------------


In [17]:
# Load data from S3 into Redshift tables
def load_data_from_s3():
    """Load data from S3 CSV files into Redshift tables"""
    
    tables_and_files = {
        'orders': 'orders.csv',
        'order_items': 'order_items.csv',
        'payments': 'payments.csv',
        'reviews': 'reviews.csv'
    }
    
    for table_name, file_name in tables_and_files.items():
        print(f"Loading data into {table_name} from {file_name}")
        
        copy_sql = f"""
        COPY {table_name}
        FROM 's3://{S3_BUCKET}/{file_name}'
        IAM_ROLE '{redshift_role_arn}'
        CSV
        IGNOREHEADER 1
        DELIMITER ','
        REGION '{region}';
        """
        
        try:
            run_redshift_statement(copy_sql)
            print(f"Loaded data into {table_name}")
        except Exception as e:
            print(f"Error loading data into {table_name}: {str(e)}")

# Load data from S3
load_data_from_s3()


Loading data into orders from orders.csv
Executing statement: 0a23526b-aa4a-4d34-9458-a6d0f8c0433a
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into orders
Loading data into order_items from order_items.csv
Executing statement: 84d54756-be6c-43c2-9bb8-4f47f5657c37
Statement status: PICKED, waiting...
Statement completed successfully
Loaded data into order_items
Loading data into payments from payments.csv
Executing statement: 8e48260c-1a43-45c8-a626-4ca55af5866c
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into payments
Loading data into reviews from reviews.csv
Executing statement: 6ba97e21-83cd-4e22-843c-068c0bfeb201
Statement status: SUBMITTED, waiting...
Statement completed successfully
Loaded data into reviews


In [18]:
# Knowledge Base Configuration for IAM Role + Redshift Serverless WorkGroup
kbConfigParam = {
    "type": "SQL",
    "sqlKnowledgeBaseConfiguration": {
        "type": "REDSHIFT",
        "redshiftConfiguration": {
            "storageConfigurations": [{
                "type": "REDSHIFT",
                "redshiftConfiguration": {
                    "databaseName": REDSHIFT_DATABASE
                }
            }],
            "queryEngineConfiguration": {
                "type": "SERVERLESS",
                "serverlessConfiguration": {
                    "workgroupArn": workgroup_arn,
                    "authConfiguration": {
                        "type": "IAM"
                    }
                }
            }
        }
    }
}


In [19]:
# Create the Knowledge Base using IAM Role + Redshift Serverless WorkGroup
print("Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...")

knowledge_base = BedrockStructuredKnowledgeBase(
    kb_name=knowledge_base_name,
    kb_description=knowledge_base_description,
    workgroup_arn=workgroup_arn,
    kbConfigParam=kbConfigParam,
    generation_model=foundation_model,
    suffix=suffix
)

print("Knowledge Base created successfully!")
print(f"Knowledge Base ID: {knowledge_base.get_knowledge_base_id()}")


[2025-06-20 21:04:12,179] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Creating Knowledge Base with IAM Role + Redshift Serverless WorkGroup access pattern...


[2025-06-20 21:04:12,950] p74707 {credentials.py:1352} INFO - Found credentials in shared credentials file: ~/.aws/credentials


Step 1 - Creating Knowledge Base Execution Role (AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647) and Policies
Step 2 - Creating Knowledge Base
{ 'createdAt': datetime.datetime(2025, 6, 21, 4, 4, 13, 800686, tzinfo=tzutc()),
  'description': 'Sample Structured KB',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:533267284022:knowledge-base/WCNCTKFCKY',
  'knowledgeBaseConfiguration': { 'sqlKnowledgeBaseConfiguration': { 'redshiftConfiguration': { 'queryEngineConfiguration': { 'serverlessConfiguration': { 'authConfiguration': { 'type': 'IAM'},
                                                                                                                                                           'workgroupArn': 'arn:aws:redshift-serverless:us-west-2:533267284022:workgroup/a68f79bf-ace3-46d7-abc3-3ee32e4998aa'},
                                                                                                                              'type': 'SERVERLESS'},
               

In [20]:
# Get the Bedrock execution role name
bedrock_role_name = knowledge_base.bedrock_kb_execution_role_name

# Display the SQL commands that need to be executed in Redshift Query Editor
print("Execute the following SQL commands in your Redshift Query Editor:")
print("=" * 70)
print(f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;')
print(f'GRANT USAGE ON SCHEMA public TO "IAMR:{bedrock_role_name}";')
print(f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";')
print("=" * 70)
print("\\nThese commands will:")
print("1. Create an IAM-based database user for the Bedrock execution role")
print("2. Grant USAGE permission on the public schema")
print("3. Grant SELECT permission on all tables in the public schema")
print("\\nNote: Adjust the schema and table permissions based on your specific requirements.")


Execute the following SQL commands in your Redshift Query Editor:
CREATE USER "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647" WITH PASSWORD DISABLE;
GRANT USAGE ON SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:AmazonBedrockExecutionRoleForStructuredKnowledgeBase_0205647";
\nThese commands will:
1. Create an IAM-based database user for the Bedrock execution role
2. Grant USAGE permission on the public schema
3. Grant SELECT permission on all tables in the public schema
\nNote: Adjust the schema and table permissions based on your specific requirements.


In [21]:
# ensure that the kb is available
time.sleep(60)
# sync knowledge base
knowledge_base.start_ingestion_job()
# keep the kb_id for invocation later in the invoke request
kb_id = knowledge_base.get_knowledge_base_id()
# %store kb_id


job  started successfully

{ 'dataSourceId': 'EOZFBUZD6F',
  'ingestionJobId': 'HOYDC9A0DV',
  'knowledgeBaseId': 'WCNCTKFCKY',
  'startedAt': datetime.datetime(2025, 6, 21, 4, 5, 39, 955341, tzinfo=tzutc()),
  'status': 'FAILED',
  'updatedAt': datetime.datetime(2025, 6, 21, 4, 5, 43, 167059, tzinfo=tzutc())}
'WCNCTKFCKY'


In [22]:
query = "How many orders are there in total?"


In [23]:
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults": 5
                } 
            }
        }
    }
)

print(response['output']['text'], end='\n'*2)


ValidationException: An error occurred (ValidationException) when calling the RetrieveAndGenerate operation: No metadata found. Please trigger an ingestion and try again. (Service: BedrockAgentRuntime, Status Code: 400, Request ID: 4846aded-b9bf-403d-ad46-00cc9556ccc6) (SDK Attempt Count: 1)

In [None]:
response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults": 5,
        } 
    },
    retrievalQuery={
        "text": query
    }
)


In [None]:
import json
import pandas as pd

# Function to extract retrieved results from Retrieve API response into a pandas dataframe.
def response_print(retrieve_resp):
    # Extract the retrievalResults list
    retrieval_results = retrieve_resp['retrievalResults']

    # Dictionary to store the extracted data
    extracted_data = {}

    # Iterate through each item in retrievalResults
    for item in retrieval_results:
        row = item['content']['row']
        for col in row:
            column_name = col['columnName']
            column_value = col['columnValue']
            
            # If this column hasn't been seen before, create a new list for it
            if column_name not in extracted_data:
                extracted_data[column_name] = []
            
            # Append the value to the appropriate list
            extracted_data[column_name].append(column_value)

    # Create a DataFrame from the extracted data
    df = pd.DataFrame(extracted_data)
    return df
    
# Display the Retrieved results records
df = response_print(response_ret)
print(df.head())


In [None]:
query_response = bedrock_agent_runtime_client.generate_query(
    queryGenerationInput={
        "text": query,
        "type": "TEXT"
    },
    transformationConfiguration={
        "mode": "TEXT_TO_SQL",
        "textToSqlConfiguration": {
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseArn": knowledge_base.knowledge_base['knowledgeBaseArn']
            }
        }
    }
)

generated_sql = query_response['queries'][0]['sql']
generated_sql


In [None]:
# Delete resources
# print("=============================== Deleting resources ==============================\n")
# knowledge_base.delete_kb(delete_iam_roles_and_policies=True)
