In [None]:
import boto3
import json
from typing import Dict, Any
import os
from dotenv import load_dotenv

In [None]:
load_dotenv()

region = os.environ.get('AWS_REGION')
MODEL_ID = os.environ.get('MODEL_ID')
role_arn = os.environ.get('ROLE_ARN')
lambda_arn = os.environ.get('LAMBDA_ARN')

In [None]:
bedrock_agent = boto3.client('bedrock-agent', region_name = region)
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name = region)

In [13]:
flow_name="webSearcher"
flow_description="A flow that creates a playlist given a genre and number of songs to include in the playlist."

In [14]:
# The input node validates that the content of the InvokeFlow request is a JSON object.
input_node = {
    "type": "Input",
    "name": "FlowInput",
    "outputs": [
        {
            "name": "document",
            "type": "String"
        }
    ]
}

In [None]:
### Knowledge base node
web_search = {
        "name": "web_search",
        "type": "LambdaFunction",
        "configuration" : {
            'lambdaFunction': {
                'lambdaArn': lambda_arn
            },
        },
        "inputs": [
            {
                "name": "codeHookInput",
                "type": "String",
                "expression": "$.data"
            }
        ],
        "outputs": [
            {
                "name": "functionResponse",
                "type": "String"
            }
        ]
}

In [16]:
output_node = {
    "type": "Output",
    "name": "FlowOutput",
    "inputs": [
        {
            "name": "document",
            "type": "String",
            "expression": "$.data"
        }
    ]
}

In [17]:
connections =  [
    {
      "name": "Input_to_KB",
      "source": input_node['name'],
      "target": web_search['name'],
      "type": "Data",
      "configuration": {
        "data": { 
            "sourceOutput": input_node['outputs'][0]['name'], 
            "targetInput": web_search['inputs'][0]['name']
          }
      }
    },
    {
      "name": "blog_to_output",
      "source": web_search['name'],
      "target": output_node['name'],
      "type": "Data",
      "configuration": {
        "data": {
          "sourceOutput": web_search['outputs'][0]['name'],
          "targetInput": output_node['inputs'][0]['name']
        }
      }
    }
]

In [18]:
response = bedrock_agent.create_flow(
    name=flow_name,
    description=flow_description,
    executionRoleArn=role_arn,
    definition={
        "nodes": [input_node, web_search, output_node],
        "connections": connections
    }
)

In [19]:
flow_id = response.get("id")

In [20]:
bedrock_agent.prepare_flow(flowIdentifier=flow_id)

{'ResponseMetadata': {'RequestId': 'b14a5f06-aaf7-4fa7-a2e1-1b43b3c01b8b',
  'HTTPStatusCode': 202,
  'HTTPHeaders': {'date': 'Sun, 30 Mar 2025 22:31:44 GMT',
   'content-type': 'application/json',
   'content-length': '40',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'b14a5f06-aaf7-4fa7-a2e1-1b43b3c01b8b',
   'x-amz-apigw-id': 'IQvsrF0xPHcEonA=',
   'x-amzn-trace-id': 'Root=1-67e9c650-77da0a707d4f6af808a46917'},
  'RetryAttempts': 0},
 'id': 'JNGCSF8R8M',
 'status': 'Preparing'}

In [47]:
response = bedrock_agent.create_flow_version(flowIdentifier=flow_id)
                                
flow_version = response.get("version")

In [48]:
response = bedrock_agent.create_flow_alias(
    flowIdentifier=flow_id,
    name="latest",
    description="Alias pointing to the latest version of the flow.",
    routingConfiguration=[
        {
            "flowVersion": flow_version
        }
    ]
)

flow_alias_id = response.get("id")

In [None]:
response = bedrock_agent_runtime.invoke_flow(
    flowIdentifier=flow_id,
    flowAliasIdentifier=flow_alias_id,
    inputs=[
        {
            "content": {
                "document": "usainblot"
            },
            "nodeName": "FlowInput",
            "nodeOutputName": "document"
        }
    ]
)

result = {}

for event in response.get("responseStream"):
    result.update(event)

if result['flowCompletionEvent']['completionReason'] == 'SUCCESS':
    print("Flow invocation was successful! The output of the flow is as follows:\n")
    print(result['flowOutputEvent']['content']['document'])

else:
    print("The flow invocation completed because of the following reason:", result['flowCompletionEvent']['completionReason'])

EventStreamError: An error occurred (validationException) when calling the InvokeFlow operation: Expected type for Node Output document of Node FlowInput to be STRING. Actual type is OBJECT.