Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming Support for Inline-Query Callback Responses #235

Merged
merged 2 commits into from
Apr 16, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions bot/telegram_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async def _generate():
parse_mode=constants.ParseMode.MARKDOWN
)

await self.wrap_with_indicator(update, context, constants.ChatAction.UPLOAD_PHOTO, _generate)
await self.wrap_with_indicator(update, context, _generate, constants.ChatAction.UPLOAD_PHOTO)

async def transcribe(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Expand Down Expand Up @@ -368,7 +368,7 @@ async def _execute():
if os.path.exists(filename):
os.remove(filename)

await self.wrap_with_indicator(update, context, constants.ChatAction.TYPING, _execute)
await self.wrap_with_indicator(update, context, _execute, constants.ChatAction.TYPING)

async def prompt(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Expand Down Expand Up @@ -512,7 +512,7 @@ async def _reply():
except Exception as exception:
raise exception

await self.wrap_with_indicator(update, context, constants.ChatAction.TYPING, _reply)
await self.wrap_with_indicator(update, context, _reply, constants.ChatAction.TYPING)

self.add_chat_request_to_usage_tracker(user_id, total_tokens)

Expand Down Expand Up @@ -580,8 +580,9 @@ async def handle_callback_inline_query(self, update: Update, context: CallbackCo
try:
if callback_data.startswith(callback_data_suffix):
unique_id = callback_data.split(':')[1]
total_tokens = 0

# Retrieve the long text from the cache
# Retrieve the prompt from the cache
query = self.inline_queries_cache.get(unique_id)
if query:
self.inline_queries_cache.pop(unique_id)
Expand All @@ -590,30 +591,90 @@ async def handle_callback_inline_query(self, update: Update, context: CallbackCo
f'{localized_text("error", bot_language)}. '
f'{localized_text("try_again", bot_language)}'
)
await self.edit_message_with_retry(context,
chat_id=None,
message_id=inline_message_id,
await self.edit_message_with_retry(context, chat_id=None, message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{error_message}',
is_inline=True)
return

# Edit the current message to indicate that the answer is being processed
await context.bot.edit_message_text(inline_message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{loading_tr}',
parse_mode=constants.ParseMode.MARKDOWN)
if self.config['stream']:
stream_response = self.openai.get_chat_response_stream(chat_id=user_id, query=query)
i = 0
prev = ''
sent_message = None
backoff = 0
async for content, tokens in stream_response:
if len(content.strip()) == 0:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here it is the same as the other comment.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log anything here. Sometimes OpenAI stream response will just be whitespaces at the beginning, I simply ignore it and wait for next chunk


logging.info(f'Generating response for inline query by {name}')
response, used_tokens = await self.openai.get_chat_response(chat_id=user_id, query=query)
self.add_chat_request_to_usage_tracker(user_id, used_tokens)
cutoff = 180 if len(content) > 1000 else 120 if len(content) > 200 else 90 if len(
content) > 50 else 50
cutoff += backoff

if i == 0:
try:
if sent_message is not None:
await self.edit_message_with_retry(context, chat_id=None,
message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{content}',
is_inline=True)
except:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvican good question. I have used the logic from the main prompt function, as the base for the inline query streaming.
I think we can add logging to both and also try to combine the semi-duplicated code

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for future logic isolation! Yes we can add a log here no problem, I didn't do that initially because these operations (delete+send initial message) failed very rarely in my testing, and even if they did they will be just retried when the next chunk comes in


elif abs(len(content) - len(prev)) > cutoff or tokens != 'not_finished':
prev = content
try:
use_markdown = tokens != 'not_finished'
await self.edit_message_with_retry(context,
chat_id=None, message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{content}',
markdown=use_markdown, is_inline=True)

except RetryAfter as e:
backoff += 5
await asyncio.sleep(e.retry_after)
continue
except TimedOut:
backoff += 5
await asyncio.sleep(0.5)
continue
except Exception:
backoff += 5
continue

await asyncio.sleep(0.01)

i += 1
if tokens != 'not_finished':
total_tokens = int(tokens)

else:
async def _send_inline_query_response():
nonlocal total_tokens
# Edit the current message to indicate that the answer is being processed
await context.bot.edit_message_text(inline_message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{loading_tr}',
parse_mode=constants.ParseMode.MARKDOWN)

logging.info(f'Generating response for inline query by {name}')
response, total_tokens = await self.openai.get_chat_response(chat_id=user_id, query=query)

# Edit the original message with the generated content
await self.edit_message_with_retry(context, chat_id=None, message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{response}',
is_inline=True)

await self.wrap_with_indicator(update, context, _send_inline_query_response,
constants.ChatAction.TYPING, is_inline=True)

self.add_chat_request_to_usage_tracker(user_id, total_tokens)

# Edit the original message with the generated content
await self.edit_message_with_retry(context,
chat_id=None,
message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{response}',
is_inline=True)
except Exception as e:
logging.error(f'Failed to respond to an inline query via button callback: {e}')
logging.exception(e)
localized_answer = localized_text('chat_fail', self.config['bot_language'])
await self.edit_message_with_retry(context, chat_id=None, message_id=inline_message_id,
text=f"{query}\n\n_{answer_tr}:_\n{localized_answer} {str(e)}",
is_inline=True)

async def edit_message_with_retry(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int | None,
message_id: str, text: str, markdown: bool = True, is_inline: bool = False):
Expand Down Expand Up @@ -652,14 +713,15 @@ async def edit_message_with_retry(self, context: ContextTypes.DEFAULT_TYPE, chat
logging.warning(str(e))
raise e

async def wrap_with_indicator(self, update: Update, context: CallbackContext, chat_action: constants.ChatAction,
coroutine):
async def wrap_with_indicator(self, update: Update, context: CallbackContext, coroutine,
chat_action: constants.ChatAction = "", is_inline=False):
"""
Wraps a coroutine while repeatedly sending a chat action to the user.
"""
task = context.application.create_task(coroutine(), update=update)
while not task.done():
context.application.create_task(update.effective_chat.send_action(chat_action))
if not is_inline:
context.application.create_task(update.effective_chat.send_action(chat_action))
try:
await asyncio.wait_for(asyncio.shield(task), 4.5)
except asyncio.TimeoutError:
Expand Down