# Create Structured Amazon Bedrock Knowledge Base with Redshift

This notebook demonstrates how to create and configure an Amazon Bedrock Knowledge Base that uses Amazon Redshift Serverless as a source for structured data. 

The Knowledge Base integrates Amazon Redshift as the data source for e-commerce transactional data and enables RAG  by powering queries over structured business data including orders, payments, reviews, and customer analytics.

This structured knowledge base will be used in conjunction with the unstructured knowledge base to create agentic RAG using Strands Agents


![Structured Knowledge Base](../images/structured_kb.png)

## Setup and Prerequisites

### Prerequisites
* Python 3.13
* AWS account with appropriate permissions
* Amazon Bedrock foundation models enabled
* IAM permissions for Amazon Redshift Serverless, Amazon S3, and Amazon Bedrock

### Required AWS Services
- **Amazon Bedrock**: For knowledge base creation and LLM inference
- **Amazon Redshift Serverless**: As the structured data source
- **Amazon S3**: For data staging and intermediate storage
- **AWS IAM**: For service permissions and roles

Let's start by importing the required libraries and setting up AWS clients:


Import required libraries for AWS service interaction, Redshift management, and data handling:

In [None]:
import json
import logging
import os
import random
import string
import time
import uuid
from datetime import datetime

import boto3
import requests

Initialize AWS service clients for S3, STS, IAM, Redshift, and Bedrock to manage infrastructure and Knowledge Base creation:

In [None]:
# Initialize AWS clients
session = boto3.session.Session()
region = session.region_name

s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
redshift_client = boto3.client('redshift-serverless', region_name=region)
redshift_data_client = boto3.client('redshift-data', region_name=region)
iam_client = boto3.client('iam')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime")

Generate a unique random suffix for AWS resource names. This prevents naming conflicts when multiple participants run the workshop simultaneously in the same AWS account.

In [None]:
# Generate unique suffix for resource names
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))

print(f"Using suffix: {suffix}")

## Step 1: Import Amazon Bedrock Knowledge Bases helper

Lets import the structured knowledge base utility to help with Knowledge Base configuration and creation.


In [None]:
import os
if 'Lab 2' in os.getcwd():
    %cd ..
else:
    print(os.getcwd())

from utils.structured_knowledge_base import BedrockStructuredKnowledgeBase

## Step 2: Set up Redshift Serverless Infrastructure

Next we will create the necessary Redshift Serverless components: namespace and workgroup. This infrastructure will host our structured data that the Knowledge Base will query.

- The namespace is a logical grouping of database objects and users. It contains the database, schemas, and other objects:
- The workgroup provides compute resources and configuration settings for running queries against the namespace:


Define configuration constants for Redshift Serverless resources that will store the structured e-commerce data:

In [None]:
# Configuration for Redshift resources
REDSHIFT_NAMESPACE = f'sds-ecommerce-{suffix}'
REDSHIFT_WORKGROUP = f'sds-ecommerce-wg-{suffix}'
REDSHIFT_DATABASE = f'sds-ecommerce'
S3_BUCKET = f'sds-ecommerce-redshift-{suffix}'

print(f"Redshift Namespace: {REDSHIFT_NAMESPACE}")
print(f"Redshift Workgroup: {REDSHIFT_WORKGROUP}")
print(f"Database: {REDSHIFT_DATABASE}")
print(f"S3 Bucket: {S3_BUCKET}")

### Create IAM Role for Redshift

Create an IAM role that allows Redshift to access S3 for data loading operations

In [None]:
def create_iam_role_for_redshift():
    """Create IAM role for Redshift to access S3"""
    try:
        # Get account ID
        account_id = sts_client.get_caller_identity()['Account']
        
        # Create IAM role if it doesn't exist
        role_name = f'RedshiftS3AccessRole-{suffix}'
        try:
            role_response = iam_client.get_role(RoleName=role_name)
            print(f'Role {role_name} already exists')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
        except iam_client.exceptions.NoSuchEntityException:
            trust_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "redshift.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
            
            iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy)
            )
            
            # Attach necessary policies
            iam_client.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
            )
            
            print(f'Created role {role_name}')
            return f'arn:aws:iam::{account_id}:role/{role_name}'
            
    except Exception as e:
        print(f'Error creating IAM role: {str(e)}')
        raise



redshift_role_arn = create_iam_role_for_redshift()
print(f"Redshift IAM Role ARN: {redshift_role_arn}")


This function creates the Redshift Serverless namespace, which provides the compute and storage resources for your data warehouse.

In [None]:
def create_redshift_namespace():
    """Create Redshift Serverless namespace"""
    try:
        # Check if namespace already exists
        try:
            response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
            print(f'Namespace {REDSHIFT_NAMESPACE} already exists')
            return response['namespace']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating namespace {REDSHIFT_NAMESPACE}...')
        
        # Create the namespace
        response = redshift_client.create_namespace(
            namespaceName=REDSHIFT_NAMESPACE,
            adminUsername='admin',
            adminUserPassword='TempPassword123!',  # Change this in production
            dbName=REDSHIFT_DATABASE,
            defaultIamRoleArn=redshift_role_arn,
            iamRoles=[redshift_role_arn]
        )
        
        print(f'Created namespace {REDSHIFT_NAMESPACE}')
        
        # Wait for namespace to be available
        print('Waiting for namespace to be available...')
        max_attempts = 30
        for attempt in range(max_attempts):
            try:
                namespace_response = redshift_client.get_namespace(namespaceName=REDSHIFT_NAMESPACE)
                status = namespace_response['namespace']['status']
                if status == 'AVAILABLE':
                    print(f'Namespace {REDSHIFT_NAMESPACE} is now available')
                    return namespace_response['namespace']
                else:
                    print(f'Namespace status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking namespace status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for namespace, but proceeding...')
        return response['namespace']
        
    except Exception as e:
        print(f'Error creating namespace: {str(e)}')
        raise

# Create namespace
namespace = create_redshift_namespace()


This function creates the Redshift Serverless workgroup, which defines the compute capacity and network configuration for query execution.

In [None]:
def create_redshift_workgroup():
    """Create Redshift Serverless workgroup"""
    try:
        # Check if workgroup already exists
        try:
            response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
            print(f'Workgroup {REDSHIFT_WORKGROUP} already exists')
            return response['workgroup']
        except redshift_client.exceptions.ResourceNotFoundException:
            print(f'Creating workgroup {REDSHIFT_WORKGROUP}...')
        
        # Create the workgroup
        response = redshift_client.create_workgroup(
            workgroupName=REDSHIFT_WORKGROUP,
            namespaceName=REDSHIFT_NAMESPACE,
            baseCapacity=8,  # Minimum base capacity
            enhancedVpcRouting=False,
            publiclyAccessible=True,
            configParameters=[
                {
                    'parameterKey': 'enable_user_activity_logging',
                    'parameterValue': 'true'
                }
            ]
        )
        
        print(f'Created workgroup {REDSHIFT_WORKGROUP}')
        
        # Wait for workgroup to be available
        print('Waiting for workgroup to be available...')
        max_attempts = 45  # 7.5 minutes
        for attempt in range(max_attempts):
            try:
                workgroup_response = redshift_client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)
                status = workgroup_response['workgroup']['status']
                if status == 'AVAILABLE':
                    print(f'Workgroup {REDSHIFT_WORKGROUP} is now available')
                    return workgroup_response['workgroup']
                else:
                    print(f'Workgroup status: {status}, waiting...')
                    time.sleep(10)
            except Exception as e:
                print(f'Error checking workgroup status: {str(e)}, retrying...')
                time.sleep(10)
        
        print('Timeout waiting for workgroup, but proceeding...')
        return response['workgroup']
        
    except Exception as e:
        print(f'Error creating workgroup: {str(e)}')
        raise

# Create workgroup
workgroup = create_redshift_workgroup()
workgroup_arn = workgroup['workgroupArn']
print(f"Workgroup ARN: {workgroup_arn}")


## Step 3: Create S3 Bucket and Load Sample Data

We will create an S3 bucket to stage our sample e-commerce data before loading it into Redshift tables.

In [None]:
def create_s3_bucket():
    """Create S3 bucket for data staging"""
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        print(f'Bucket {S3_BUCKET} already exists')
    except:
        try:
            if region == 'us-east-1':
                s3_client.create_bucket(Bucket=S3_BUCKET)
            else:
                s3_client.create_bucket(
                    Bucket=S3_BUCKET,
                    CreateBucketConfiguration={'LocationConstraint': region}
                )
            print(f'Created bucket {S3_BUCKET}')
        except Exception as e:
            print(f'Error creating bucket: {str(e)}')
            raise

# Create S3 bucket
create_s3_bucket()

This function uploads the sample e-commerce CSV files (orders, products, customers) to the S3 bucket for loading into Redshift.

In [None]:
def upload_sample_data():
    """Upload sample CSV files to S3"""
    data_files = ['orders.csv', 'order_items.csv', 'payments.csv', 'reviews.csv']
    sds_directory = 'sample_structured_data'
    
    print("Uploading sample data files to S3...")
    files_found = 0
    
    for file_name in data_files:
        local_path = os.path.join(sds_directory, file_name)
        if os.path.exists(local_path):
            # Get file size for informational purposes
            file_size = os.path.getsize(local_path)
            file_size_mb = file_size / (1024 * 1024)
            
            s3_client.upload_file(local_path, S3_BUCKET, file_name)
            print(f'Uploaded {file_name} ({file_size_mb:.1f} MB) to S3')
            files_found += 1
        else:
            print(f'Warning: {local_path} not found')
    
    if files_found == len(data_files):
        print(f"\nSuccessfully uploaded all {files_found} data files to S3")
    else:
        print(f"\nOnly {files_found} out of {len(data_files)} files were found and uploaded")

# Upload sample data
upload_sample_data()


## Step 4: Create Redshift Tables and Load Data

Now we will create the database tables in Redshift and load our sample e-commerce data.

### Define Redshift Data API Helper Functions

These functions help us execute SQL statements using the Redshift Data API:


In [None]:
def wait_for_statement(statement_id):
    """Wait for a Redshift Data API statement to complete"""
    max_attempts = 30
    for attempt in range(max_attempts):
        try:
            response = redshift_data_client.describe_statement(Id=statement_id)
            status = response['Status']
            if status == 'FINISHED':
                return response
            elif status == 'FAILED':
                raise Exception(f"Statement failed: {response.get('Error', 'Unknown error')}")
            elif status == 'CANCELLED':
                raise Exception("Statement was cancelled")
            else:
                print(f"Statement status: {status}, waiting...")
                time.sleep(5)
        except Exception as e:
            if 'Statement failed' in str(e) or 'cancelled' in str(e):
                raise
            print(f"Error checking statement status: {str(e)}, retrying...")
            time.sleep(5)
    
    raise Exception("Timeout waiting for statement to complete")

def run_redshift_statement(sql_statement):
    """Execute a SQL statement in Redshift"""
    try:
        response = redshift_data_client.execute_statement(
            WorkgroupName=REDSHIFT_WORKGROUP,
            Database=REDSHIFT_DATABASE,
            Sql=sql_statement
        )
        statement_id = response['Id']
        print(f"Executing statement: {statement_id}")
        result = wait_for_statement(statement_id)
        print(f"Statement completed successfully")
        return result
    except Exception as e:
        print(f"Error executing statement: {str(e)}")
        raise


### Create Database Tables

Create the database tables in Reshift to store structured data sample with appropriate schema 

In [None]:
 # Wait a bit more for workgroup to be fully ready
print("Waiting additional time for workgroup to be fully ready...")
time.sleep(60)  # Wait 1 more minute

In [None]:
# Create tables in Redshift
def create_tables():
    """Create all necessary tables in Redshift"""
    
    # Orders table
    orders_sql = """
    CREATE TABLE IF NOT EXISTS orders (
        order_id VARCHAR(255) PRIMARY KEY,
        customer_id VARCHAR(255),
        order_total DECIMAL(10,2),
        order_status VARCHAR(50),
        payment_method VARCHAR(50),
        shipping_address TEXT,
        created_at TIMESTAMP,
        updated_at TIMESTAMP
    );
    """
    
    # Order Items table
    order_items_sql = """
    CREATE TABLE IF NOT EXISTS order_items (
        order_item_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        product_id VARCHAR(255),
        quantity INTEGER,
        price DECIMAL(10,2)
    );
    """
    
    # Payments table
    payments_sql = """
    CREATE TABLE IF NOT EXISTS payments (
        payment_id VARCHAR(255) PRIMARY KEY,
        order_id VARCHAR(255),
        customer_id VARCHAR(255),
        amount DECIMAL(10,2),
        payment_method VARCHAR(50),
        payment_status VARCHAR(50),
        created_at DATE
    );
    """
    
    # Reviews table
    reviews_sql = """
    CREATE TABLE IF NOT EXISTS reviews (
        review_id VARCHAR(255) PRIMARY KEY,
        product_id VARCHAR(255),
        customer_id VARCHAR(255),
        rating INTEGER,
        created_at DATE
    );
    """
    
    tables = {
        'orders': orders_sql,
        'order_items': order_items_sql,
        'payments': payments_sql,
        'reviews': reviews_sql
    }
    
    for table_name, sql in tables.items():
        print(f"Creating table: {table_name}")
        run_redshift_statement(sql)
        print(f"Created table: {table_name}")
        print("-------------")

# Create tables
create_tables()


### Load Data from S3 into Redshift Tables

Use the COPY command to efficiently load data from S3 CSV files into our Redshift tables:


In [None]:
# Load data from S3 into Redshift tables
def load_data_from_s3():
    """Load data from S3 CSV files into Redshift tables"""
    
    tables_and_files = {
        'orders': 'orders.csv',
        'order_items': 'order_items.csv',
        'payments': 'payments.csv',
        'reviews': 'reviews.csv'
    }
    
    for table_name, file_name in tables_and_files.items():
        print(f"Loading data into {table_name} from {file_name}")
        
        copy_sql = f"""
        COPY {table_name}
        FROM 's3://{S3_BUCKET}/{file_name}'
        IAM_ROLE '{redshift_role_arn}'
        CSV
        IGNOREHEADER 1
        DELIMITER ','
        REGION '{region}';
        """
        
        try:
            run_redshift_statement(copy_sql)
            print(f"Loaded data into {table_name}")
        except Exception as e:
            print(f"Error loading data into {table_name}: {str(e)}")

# Load data from S3
load_data_from_s3()

## Step 5: Create Bedrock Knowledge Base with Redshift Data Source

Now we'll create the Bedrock Knowledge Base configured to use our Redshift data as a structured data source.


These settings define the structured Knowledge Base identity and specify the foundation model that will generate SQL queries from natural language questions.

In [None]:
# Configure Knowledge Base parameters
kb_name = f"redshift-structured-kb-{suffix}"
kb_description = "Structured Knowledge Base for e-commerce data queries using Redshift"
generation_model = "global.anthropic.claude-haiku-4-5-20251001-v1:0"

print(f"Knowledge Base Name: {kb_name}")


Amazon Bedrock Knowledge Bases uses a service role to connect knowledge bases to structured data stores, retrieve data from these data stores, and generate SQL queries based on user queries and the structure of the data stores. There are several access patterns based on if you're using Redshift Serverless vs Redshift Provisioned Cluster. In this notebook, let's use `IAM Role + Redshift Serverless WorkGroup` access pattern.

### Configure Knowledge Base Configuration Parameters

The `kb_config_param` dictionary defines how the Knowledge Base connects to and queries your Redshift data warehouse. This configuration follows the Amazon Bedrock Knowledge Base API structure for structured data sources.

**Key Configuration Elements:**

- **type**: Set to `"SQL"` to indicate this is a structured data source
- **sqlKnowledgeBaseConfiguration**: Contains Redshift-specific settings
  - **storageConfigurations**: Defines the database connection (database name)
  - **queryEngineConfiguration**: Specifies Redshift Serverless workgroup and authentication method
  - **authConfiguration**: Authentication options include:
    - `"IAM"`: Uses IAM roles for secure, credential-free access (used in this workshop)
    - `"USERNAME_PASSWORD"`: Uses Secrets Manager to store database credentials
    - `"USERNAME"`: Uses username-only authentication with IAM

**Documentation:** For complete API reference and configuration options, see the [Amazon Bedrock Knowledge Base API documentation](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_agent_KnowledgeBaseConfiguration.html) and [Structured data source configuration guide](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-setup-structured.html).

In [None]:
# Configure Knowledge Base parameters for Redshift Serverless with IAM authentication
kb_config_param = {
    "type": "SQL",
    "sqlKnowledgeBaseConfiguration": {
        "type": "REDSHIFT",
        "redshiftConfiguration": {
            "storageConfigurations": [{
                "type": "REDSHIFT",
                "redshiftConfiguration": {
                    "databaseName": REDSHIFT_DATABASE
                }
            }],
            "queryEngineConfiguration": {
                "type": "SERVERLESS",
                "serverlessConfiguration": {
                    "workgroupArn": workgroup_arn,
                    "authConfiguration": {
                        "type": "IAM"
                    }
                }
            }
        }
    }
}

print(f"Knowledge Base configuration: {kb_config_param}")

### Create the Structured Knowledge Base

Use the BedrockStructuredKnowledgeBase utility to create the Knowledge Base with all necessary components:

In [None]:
try:
    structured_kb = BedrockStructuredKnowledgeBase(
        kb_name=kb_name,
        kb_description=kb_description,
        workgroup_arn=workgroup_arn,
        kbConfigParam=kb_config_param,
        generation_model=generation_model,
        suffix=suffix
    )
    
    print("Knowledge Base created successfully!")
    kb_id = structured_kb.get_knowledge_base_id()
    print(f"Knowledge Base ID: {kb_id}")
    
except Exception as e:
    print(f"Error creating Knowledge Base: {str(e)}")
    raise


## Step 6: Database Access Configuration for IAM Role + Redshift Serverless WorkGroup


Extract the IAM role name from the Knowledge Base service role ARN. This role name is needed to create a corresponding database user in Redshift that maps the IAM role to database-level permissions, enabling the Knowledge Base to query the database.

In [None]:
# Extract the IAM role name from the ARN for database user creation
kb_details = structured_kb.knowledge_base

bedrock_role_arn = kb_details['roleArn']
bedrock_role_name = bedrock_role_arn.split('/')[-1]
print(f"   Extracted Role Name: {bedrock_role_name}")

### Create IAM-based Database User in Redshift

Create a database user mapped to the Bedrock Knowledge Base IAM role to enable database access


In [None]:

# Create the IAM user in Redshift (this is the critical missing step!)
create_user_sql = f'CREATE USER "IAMR:{bedrock_role_name}" WITH PASSWORD DISABLE;'

try:
    print(f"Creating user: IAMR:{bedrock_role_name}")
    run_redshift_statement(create_user_sql)
    print("IAM user created successfully!")
except Exception as e:
    if "already exists" in str(e).lower():
        print("User already exists, continuing...")
    else:
        print(f"Error creating user: {str(e)}")
        raise

### Grant Database Permissions

Grant SELECT permissions on all tables to the IAM-based database user


In [None]:
# Grant SELECT on all tables in public schema
grant_select_sql = f'GRANT SELECT ON ALL TABLES IN SCHEMA public TO "IAMR:{bedrock_role_name}";'

try:
    print(f"Granting SELECT permissions to: IAMR:{bedrock_role_name}")
    run_redshift_statement(grant_select_sql)
    print("SELECT permissions granted successfully!")
except Exception as e:
    print(f"Error granting permissions: {str(e)}")
    raise

## Step 7: Start Ingestion Job

Now that the database permissions are properly configured, let's start the ingestion job to sync the data from the Redshift database.

Wait for the Knowledge Base to be fully provisioned, then start the ingestion job. This process analyzes the Redshift database schema and table structures, enabling the Knowledge Base to generate accurate SQL queries from natural language questions.

In [None]:
# Wait a bit for the Knowledge Base to be fully ready
time.sleep(60)
structured_kb.start_ingestion_job()

### Store Knowledge Base Configuration

Store the Knowledge Base ID and region in Jupyter's variable store for use in the next notebook (2.2-test-structured-kb.ipynb).

In [None]:
# Store the structured knowledge base configuration
structured_kb_id = structured_kb.get_knowledge_base_id()
structured_kb_region = region
structured_workgroup_arn = workgroup_arn
structured_database_name = REDSHIFT_DATABASE

# Store variables for use in main notebook
%store structured_kb_id
%store structured_kb_region
%store structured_workgroup_arn
%store structured_database_name

print("="*60)
print(f"Structured Knowledge Base ID: {structured_kb_id}")
print(f"Region: {structured_kb_region}")
print(f"Workgroup ARN: {structured_workgroup_arn}")
print(f"Database Name: {structured_database_name}")
print("="*60)
print("Configuration stored successfully!")


Display the Knowledge Base ID to verify it was created successfully before storing it in SSM Parameter Store.

In [None]:
structured_kb_id

The structured Knowledge Base ID is needed by the agent in Lab 3 to query the Redshift database. Storing it in SSM Parameter Store enables secure, centralized configuration sharing.

In [None]:
param_name = '/app/intelligent_rag/agentcore/structured_kb_id'

ssm = boto3.client("ssm")
ssm.put_parameter(Name=param_name, Value=structured_kb_id, Type="String", Overwrite=True)
print(f"Stored {structured_kb_id} in SSM: {param_name}")

## Summary

If all the above cells executed successfully, you have:

- Created Amazon Redshift Serverless namespace and workgroup infrastructure
- Set up an S3 bucket and uploaded sample structured data  
- Created database tables and loaded data from S3 using COPY commands
- Created an Amazon Bedrock Knowledge Base configured for structured data queries
- Configured IAM-based database access with proper permissions
- Successfully completed the data ingestion job 
- Stored the Knowledge Base configuration for use in the main notebook


You can now proceed to test the structured knowledge base with [2.2-test-structured-kb.ipynb](2.2-test-structured-kb.ipynb) notebook 
