Skip to content

Commit 6c404f9

Browse files
authored
Merge pull request #5 from KyMidd/feature/add-streaming
add streaming, improve error handling
2 parents 9f1adaa + d32a33b commit 6c404f9

File tree

1 file changed

+108
-39
lines changed

1 file changed

+108
-39
lines changed

python/devopsbot.py

Lines changed: 108 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import logging
1313
import boto3
1414
import json
15-
import base64
1615
import requests
1716
import re
1817

@@ -72,7 +71,7 @@ def ask_bedrock_llm_with_knowledge_base(flat_conversation, knowledge_base_id) ->
7271
region_name=model_region_name
7372
)
7473

75-
# uses embedding model to retrieve and generate a response
74+
# Uses model to retrieve related vectors from knowledge base
7675
response = bedrock_agent_runtime_client.retrieve(
7776
retrievalQuery={
7877
'text': flat_conversation
@@ -175,8 +174,48 @@ def register_slack_app(token, signing_secret):
175174
return app, registered_bot_id
176175

177176

178-
# Function to handle ai request input and response
179-
def ai_request(bedrock_client, messages):
177+
# Receives the streaming response and updates the slack message, chunk by chunk
178+
def response_on_slack(client, streaming_response, initial_response, channel_id, thread_ts):
179+
180+
# Print streaming response
181+
if os.environ.get("VERA_DEBUG", "False") == "True":
182+
print("🚀 Streaming response:", streaming_response["stream"])
183+
184+
# Counter and buffer vars for streaming response
185+
response = ""
186+
token_counter = 0
187+
buffer = ""
188+
189+
# Iterate over streamed chunks
190+
for chunk in streaming_response["stream"]:
191+
if "contentBlockDelta" in chunk:
192+
text = chunk["contentBlockDelta"]["delta"]["text"]
193+
response += text
194+
buffer += text
195+
token_counter += 1
196+
197+
if token_counter >= 10:
198+
client.chat_update(
199+
text=response,
200+
channel=channel_id,
201+
ts=initial_response['ts']
202+
)
203+
# Every time we update to slack, we zero out the token counter and buffer
204+
token_counter = 0
205+
buffer = ""
206+
207+
# If buffer contains anything after iterating over any chunks, add it also
208+
# This completes the update
209+
if buffer:
210+
client.chat_update(
211+
text=response,
212+
channel=channel_id,
213+
ts=initial_response['ts']
214+
)
215+
216+
217+
# Handle ai request input and response
218+
def ai_request(bedrock_client, messages, say, thread_ts, client, initial_response, channel_id):
180219

181220
# Format model system prompt for the request
182221
model_prompt = [
@@ -194,13 +233,13 @@ def ai_request(bedrock_client, messages):
194233
additional_model_fields = {
195234
"top_k": top_k
196235
}
197-
236+
198237
# If enable_guardrails is set to True, include guardrailIdentifier and guardrailVersion in the request
199238
if enable_guardrails:
200239

201240
# Try to make the request
202241
try:
203-
response = bedrock_client.converse(
242+
streaming_response = bedrock_client.converse_stream(
204243
modelId=model_id,
205244
guardrailConfig={
206245
"guardrailIdentifier": guardrailIdentifier,
@@ -211,31 +250,38 @@ def ai_request(bedrock_client, messages):
211250
inferenceConfig=inference_config,
212251
additionalModelRequestFields=additional_model_fields
213252
)
214-
# Find the response text
215-
response = response["output"]["message"]["content"][0]["text"]
253+
254+
# Call function to respond on slack
255+
response_on_slack(client, streaming_response, initial_response, channel_id, thread_ts)
256+
216257
except Exception as error:
217258
# If the request fails, print the error
218259
print(f"🚀 Error making request to Bedrock: {error}")
219260

220261
# Clean up error message, grab everything after the first :
221-
error = str(error).split(":", 1)[1]
262+
error = str(error).split(":")[-1].strip()
222263

223264
# Return error as response
224-
response = "😔 Error with request: " + str(error)
265+
say(
266+
text="😔 Error with request: " + str(error),
267+
thread_ts=thread_ts,
268+
)
225269

226-
# If enable_guardrails is set to False, do not include guardrailIdentifier and guardrailVersion in the request
270+
# If enable_guardrails is set to False, do not include guardrailIdentifier and guardrailVersion in the request
227271
else:
228272
# Try to make the request
229273
try:
230-
response = bedrock_client.converse(
274+
streaming_response = bedrock_client.converse_stream(
231275
modelId=model_id,
232276
messages=messages,
233277
system=model_prompt,
234278
inferenceConfig=inference_config,
235279
additionalModelRequestFields=additional_model_fields
236280
)
237-
# Find the response text
238-
response = response["output"]["message"]["content"][0]["text"]
281+
282+
# Respond on slack
283+
response_on_slack(client, streaming_response, initial_response, channel_id, thread_ts)
284+
239285
except Exception as error:
240286
# If the request fails, print the error
241287
print(f"🚀 Error making request to Bedrock: {error}")
@@ -244,10 +290,10 @@ def ai_request(bedrock_client, messages):
244290
error = str(error).split(":", 1)[1]
245291

246292
# Return error as response
247-
response = "😔 Error with request: " + str(error)
248-
249-
# Return the response
250-
return response
293+
say(
294+
text="😔 Error with request: " + str(error),
295+
thread_ts=thread_ts,
296+
)
251297

252298

253299
# Check for duplicate events
@@ -258,11 +304,16 @@ def check_for_duplicate_event(headers, payload):
258304
print("🚀 Headers:", headers)
259305
print("🚀 Payload:", payload)
260306

307+
# Checking for webhook when we edit our own message, which happens all the time with streaming tokens
308+
if payload.get("event", {}).get("subtype") == "message_changed":
309+
print("Detected a message changed event, discarding")
310+
logging.info("Detected a message changed event, discarding")
311+
return True
312+
261313
# Check headers, if x-slack-retry-num is present, this is a re-send
262314
# Really we should be doing async lambda model, but for now detecting resends and exiting
263315
if "x-slack-retry-num" in headers:
264-
print("❌ Detected a re-send, exiting")
265-
logging.info("❌ Detected a re-send, exiting")
316+
print("Detected a Slack re-try, exiting")
266317
return True
267318

268319
# Check if edited message in local development
@@ -505,7 +556,7 @@ def build_conversation_content(payload, token):
505556
# Common function to handle both DMs and app mentions
506557
def handle_message_event(client, body, say, bedrock_client, app, token, registered_bot_id):
507558

508-
#user_id = body["event"]["user"]
559+
channel_id = body["event"]["channel"]
509560
event = body["event"]
510561

511562
# Determine the thread timestamp
@@ -605,6 +656,12 @@ def handle_message_event(client, body, say, bedrock_client, app, token, register
605656
if enable_knowledge_base:
606657
print("🚀 Knowledge base enabled, fetching citations")
607658

659+
# Respond to the user that we're fetching citations
660+
initial_response = say(
661+
text=f"Checking the knowledge base :waiting:",
662+
thread_ts=thread_ts,
663+
)
664+
608665
if os.environ.get("VERA_DEBUG", "False") == "True":
609666
print("🚀 State of conversation before AI request:", conversation)
610667

@@ -623,7 +680,22 @@ def handle_message_event(client, body, say, bedrock_client, app, token, register
623680
print(f"🚀 Flat conversation: {flat_conversation}")
624681

625682
# Get context data from the knowledge base
626-
knowledge_base_response = ask_bedrock_llm_with_knowledge_base(flat_conversation, ConfluenceKnowledgeBaseId)
683+
try:
684+
knowledge_base_response = ask_bedrock_llm_with_knowledge_base(flat_conversation, ConfluenceKnowledgeBaseId)
685+
except Exception as error:
686+
# If the request fails, print the error
687+
print(f"🚀 Error making request to Bedrock: {error}")
688+
689+
# Split the error message at a colon, grab everything after the third colon
690+
error = str(error).split(":", 2)[-1].strip()
691+
692+
# Return error as response
693+
client.chat_update(
694+
text=f"😔 Error fetching from knowledge base: " + error,
695+
channel=channel_id,
696+
ts=initial_response['ts'],
697+
)
698+
return
627699

628700
if os.environ.get("VERA_DEBUG", "False") == "True":
629701
print(f"🚀 Knowledge base response: {knowledge_base_response}")
@@ -645,26 +717,23 @@ def handle_message_event(client, body, say, bedrock_client, app, token, register
645717
}
646718
)
647719

720+
# Update the initial response
721+
if enable_knowledge_base:
722+
client.chat_update(
723+
text=f"Chatting with the AI :waiting:",
724+
channel=channel_id,
725+
ts=initial_response['ts'],
726+
)
727+
else:
728+
initial_response = say(
729+
text=f"Chatting with the AI :waiting:",
730+
thread_ts=thread_ts,
731+
)
732+
648733
# Call the AI model with the conversation
649734
if os.environ.get("VERA_DEBUG", "False") == "True":
650735
print("🚀 State of conversation before AI request:", conversation)
651-
response_text = ai_request(bedrock_client, conversation)
652-
653-
# Check if unsupported_file_type_found
654-
if unsupported_file_type_found == True:
655-
# If true, prepend error to response text
656-
response_text = f"> `Error`: Unsupported file type found, please ensure you are sending a supported file type. Supported file types are: images (png, jpeg, gif, webp).\n{
657-
response_text}"
658-
659-
if os.environ.get("VERA_DEBUG", "False") == "True":
660-
print("🚀 Response text after adding errors:", response_text)
661-
662-
# Return response in the thread
663-
say(
664-
# text=f"Oh hi <@{user_id}>!\n\n{response_text}",
665-
text=f"{response_text}",
666-
thread_ts=thread_ts,
667-
)
736+
ai_request(bedrock_client, conversation, say, thread_ts, client, initial_response, channel_id)
668737

669738
# Print success
670739
print("🚀 Successfully responded to message, exiting")

0 commit comments

Comments
 (0)