In [None]:
import boto3
import botocore.exceptions
import time

**Upgrade boto3/botocore** (required for MSK CreateTopic API). In SageMaker Studio, pip may report dependency conflicts with other pre-installed packages (e.g. autogluon, aiobotocore, sagemaker-studio-analytics-extension). You can **ignore those messages** for this notebook. Confirm you see `Successfully installed boto3-... botocore-...`. If you later see `AttributeError: 'Kafka' object has no attribute 'create_topic'`, restart the kernel and run the cells again.

In [None]:
# CreateTopic API requires boto3 >= 1.42.46
# Dependency conflict messages from other Studio packages can be ignored; look for "Successfully installed boto3-... botocore-..."
!pip install 'boto3>=1.42.46' 'botocore>=1.42.46'

In [None]:
StackName = 'BedrockStreamIngest'
KafkaTopic = 'streamtopic'

In [None]:
# Get Knowledge Base and Data Source names from CloudFormation stack outputs

cf_client = boto3.client('cloudformation')

try:
    describe_stacks_response = cf_client.describe_stacks(
        StackName=StackName
    )

    outputs = describe_stacks_response['Stacks'][0]['Outputs']

    KBName = None
    DSName = None

    for output in outputs:
        if output['OutputKey'] == 'KnowledgeBaseName':
            KBName = output['OutputValue']
        elif output['OutputKey'] == 'DataSourceName':
            DSName = output['OutputValue']

    print('Knowledge Base Name:', KBName)
    print('Data Source Name:', DSName)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Get MSK Cluster ARN (Created via CloudFormation)

cf_client = boto3.client('cloudformation')

try:
    describe_stack_resource_response = cf_client.describe_stack_resource(
        StackName = StackName,
        LogicalResourceId = 'MSKCluster'
    )

    MSKClusterArn = describe_stack_resource_response['StackResourceDetail']['PhysicalResourceId']
    print('MSK Cluster ARN:', MSKClusterArn)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Get MSK cluster client connection string (Created via CloudFormation)

kafka_client = boto3.client('kafka')

try:
    get_bootstrap_brokers_response = kafka_client.get_bootstrap_brokers(
        ClusterArn = MSKClusterArn
    )
    BootstrapBrokerString = get_bootstrap_brokers_response['BootstrapBrokerString']
    print(BootstrapBrokerString)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

Create Kafka topic using MSK API (no Kafka client installation required)

Amazon MSK now provides CreateTopic API that allows you to create Kafka topics programmatically without setting up Kafka admin clients. This eliminates the need for Java, Kafka client, and aws-msk-iam-auth installation.

In [None]:
# Create Kafka topic using MSK CreateTopic API

kafka_client = boto3.client('kafka')

try:
    create_topic_response = kafka_client.create_topic(
        ClusterArn=MSKClusterArn,
        TopicName=KafkaTopic,
        PartitionCount=1,
        ReplicationFactor=2
    )
    print('Topic ARN:', create_topic_response['TopicArn'])
    print('Topic Name:', create_topic_response['TopicName'])
    print('Status:', create_topic_response['Status'])

except botocore.exceptions.ClientError as error:
    if error.response['Error']['Code'] == 'ConflictException':
        print(f'Topic "{KafkaTopic}" already exists')
    else:
        print(error)
        raise error

In [None]:
# Verify created topics using MSK ListTopics API

kafka_client = boto3.client('kafka')

try:
    list_topics_response = kafka_client.list_topics(
        ClusterArn=MSKClusterArn,
        MaxResults=100
    )

    print(f'Total topics: {len(list_topics_response["Topics"])}')
    print('\nTopics:')
    for topic in list_topics_response['Topics']:
        print(f'  - {topic["TopicName"]} (Partitions: {topic["PartitionCount"]}, Replication: {topic["ReplicationFactor"]})')

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

Create a Bedrock Knowledge Base via the AWS console

- Navigate to "Knowledge Bases" page within Amazon Bedrock service AWS console page.
- Find the "Create" button and choose "Knowledge Base with vector store" option.
- For "Knowledge Base name", use the value from KBName variable printed above (default: BedrockStreamIngestKnowledgeBase)
- For Data Source, choose "Custom".
- In Data Source configuration page, use the value from DSName variable printed above (default: BedrockStreamIngestKBCustomDS)
- For Embeddings Model, choose "Titan Text Embeddings v2"
- Leave the rest as defaults and hit "Create Knowledge Base" button.
- Wait until the Knowledge Base is created.

In [None]:
# Get Knowledge Base ID (Created manually)

bedrock_agent_client = boto3.client('bedrock-agent')
KBId = "None"
try:
    list_knowledge_bases_response = bedrock_agent_client.list_knowledge_bases(
        maxResults=100
    )

    for knowledge_base in list_knowledge_bases_response['knowledgeBaseSummaries']:
        if KBName in knowledge_base['name']:
            KBId = knowledge_base['knowledgeBaseId']

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

print(KBId)

In [None]:
# Get the custom data source ID (Created manually)

bedrock_agent_client = boto3.client('bedrock-agent')
DSId = "None"
try:
    list_data_sources_response = bedrock_agent_client.list_data_sources(
        knowledgeBaseId = KBId
    )

    for data_source in list_data_sources_response['dataSourceSummaries']:
        if DSName in data_source['name']:
            DSId = data_source['dataSourceId']

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

print(DSId)

In [None]:
# Get the physical ID of the Lambda consumer function (created via CloudFormation)

cf_client = boto3.client('cloudformation')

try:
    describe_stack_resource_response = cf_client.describe_stack_resource(
        StackName = StackName,
        LogicalResourceId = 'KafkaConsumerLambdaFunction'
    )

    LambdaFunctionName = describe_stack_resource_response['StackResourceDetail']['PhysicalResourceId']
    print('Lambda Function Name:', LambdaFunctionName)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Set the KB ID and DS ID as Environment Variables of the consumer Lambda funciton

lambda_client = boto3.client('lambda')

try:
    update_function_configuration_response = lambda_client.update_function_configuration(
        FunctionName=LambdaFunctionName,
        Environment={
            'Variables': {
                'KBID': KBId,
                'DSID': DSId
            }
        }
    )
    print(update_function_configuration_response)

except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Verify if the Environment Variables are set within consumer Lambda function

lambda_client = boto3.client('lambda')

try:
    get_function_response = lambda_client.get_function(
        FunctionName=LambdaFunctionName
    )
    print(get_function_response['Configuration']['Environment']['Variables'])

except botocore.exceptions.ClientError as error:
    print(error)
    raise error


In [None]:
# Add MSK topic trigger to the consumer Lambda function

lambda_client = boto3.client('lambda')

try:
    create_event_source_mapping_response = lambda_client.create_event_source_mapping(
        EventSourceArn=MSKClusterArn,
        FunctionName=LambdaFunctionName,
        StartingPosition='LATEST',
        Enabled=True,
        Topics=['streamtopic']
    )
    print(create_event_source_mapping_response)
except botocore.exceptions.ClientError as error:
    print(error)
    raise error

In [None]:
# Ensure the MSK trigger is fully enabled

lambda_client = boto3.client('lambda')

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    try:
        list_event_source_mappings_response = lambda_client.list_event_source_mappings(
            FunctionName = LambdaFunctionName,
            EventSourceArn=MSKClusterArn
        )
        for mapping in list_event_source_mappings_response['EventSourceMappings']:
            print('Event source mapping UUID:', mapping['UUID'])
            print('Event source enablment status:', mapping['State'])
            status = mapping['State']
    except botocore.exceptions.ClientError as error:
        print(error)
        raise error

    if status == "Enabled" or status == "Disabled":
        break
    time.sleep(30)

In [None]:
%store StackName
%store KafkaTopic
%store KBName
%store DSName
%store LambdaFunctionName
%store KBId
%store DSId
%store BootstrapBrokerString
%store MSKClusterArn