Skip to content

Commit

Permalink
Add tests & slight refactor to TelegramTransport key handling (#528)
Browse files Browse the repository at this point in the history
This PR:

1. Adds a few additional tests to TelegramTransport
2. Improves TelegramTransport's handling of dynamically set keys (it
already had this; this PR makes it a bit more robust)

This will enable a few nice things form a user perspective:

1. Users can re-bind an Agent via the Web UI to a new Telegram Bot
2. Users can defer binding their Agent to a Telegram Bot until after
they've created an instance

Follow-on work is a PR to do for Telegram what we just did for Slack in
the web UI:


![image](https://github.com/steamship-core/python-client/assets/63262/1974dc4a-2171-42cf-b90b-d0e6d79de18c)
  • Loading branch information
eob committed Aug 25, 2023
1 parent 5d3ee70 commit 05a8f0e
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/steamship/agents/examples/telegram_bot.py
Expand Up @@ -39,7 +39,7 @@ class TelegramBot(AgentService):
"""

class TelegramBotConfig(Config):
bot_token: str = Field(description="The secret token for your Telegram bot")
telegram_bot_token: str = Field(description="The secret token for your Telegram bot")

config: TelegramBotConfig

Expand All @@ -66,7 +66,7 @@ def __init__(self, **kwargs):
TelegramTransport(
client=self.client,
# IMPORTANT: This is the TelegramTransportConfig, not the AgentService config!
config=TelegramTransportConfig(bot_token=self.config.bot_token),
config=TelegramTransportConfig(bot_token=self.config.telegram_bot_token),
agent_service=self,
)
)
Expand Down
11 changes: 9 additions & 2 deletions src/steamship/agents/mixins/transports/slack.py
Expand Up @@ -556,15 +556,19 @@ def slack_respond(self, **kwargs) -> InvocableResponse[str]:
@post("set_slack_access_token")
def set_slack_access_token(self, token: str) -> InvocableResponse[str]:
"""Set the slack access token."""
kv = KeyValueStore(client=self.agent_service.client, store_identifier=SETTINGS_KVSTORE_KEY)
kv = KeyValueStore(
client=self.agent_service.client, store_identifier=self.setting_store_key()
)
kv.set("slack_token", {"token": token})
return InvocableResponse(string="OK")

def get_slack_access_token(self) -> Optional[str]:
"""Return the Slack Access token, which permits the agent to post to Slack."""
if self.bot_token:
return self.bot_token
kv = KeyValueStore(client=self.agent_service.client, store_identifier=SETTINGS_KVSTORE_KEY)
kv = KeyValueStore(
client=self.agent_service.client, store_identifier=self.setting_store_key()
)
v = kv.get("slack_token")
if not v:
return None
Expand All @@ -578,3 +582,6 @@ def is_slack_token_set(self) -> InvocableResponse[bool]:
if token is None:
return InvocableResponse(json=False)
return InvocableResponse(json=True)

def setting_store_key(self):
return f"{SETTINGS_KVSTORE_KEY}-{self.agent_service.context.invocable_instance_handle}"
170 changes: 138 additions & 32 deletions src/steamship/agents/mixins/transports/telegram.py
Expand Up @@ -9,9 +9,11 @@
from steamship.agents.mixins.transports.transport import Transport
from steamship.agents.schema import EmitFunc, Metadata
from steamship.agents.service.agent_service import AgentService
from steamship.invocable import Config, InvocableResponse, InvocationContext, post
from steamship.invocable import Config, InvocableResponse, post
from steamship.utils.kv_store import KeyValueStore

SETTINGS_KVSTORE_KEY = "telegram-transport"


class TelegramTransportConfig(Config):
bot_token: Optional[str] = Field("", description="The secret token for your Telegram bot")
Expand All @@ -23,11 +25,9 @@ class TelegramTransportConfig(Config):
class TelegramTransport(Transport):
"""Experimental base class to encapsulate a Telegram communication channel."""

api_root: str
bot_token: str
bot_token: Optional[str]
agent_service: AgentService
config: TelegramTransportConfig
context: InvocationContext

def __init__(
self,
Expand All @@ -37,29 +37,37 @@ def __init__(
):
super().__init__(client=client)
self.config = config
self.store = KeyValueStore(self.client, store_identifier="_telegram_config")
bot_token = (self.store.get("bot_token") or {}).get("token")
self.bot_token = config.bot_token or bot_token
self.api_root = f"{config.api_base}{self.bot_token}"
self.agent_service = agent_service
try:
self.bot_token = self.get_telegram_access_token() or None
except BaseException as e:
logging.warning(e)
self.bot_token = None

def instance_init(self):
if self.bot_token:
self.api_root = f"{self.config.api_base}{self.config.bot_token or self.bot_token}"
if self.get_telegram_access_token():
try:
self._instance_init()
self.telegram_connect_webhook()
except Exception: # noqa: S110
pass

def _instance_init(self):
@post("telegram_connect_webhook")
def telegram_connect_webhook(self):
"""Register this AgentService with Telegram."""
webhook_url = self.agent_service.context.invocable_url + "telegram_respond"

api_root = self.get_api_root()
if not api_root:
raise SteamshipError(
message="Unable to determine Telegram API root -- perhaps your bot token isn't set?"
)

logging.info(
f"Setting Telegram webhook URL: {webhook_url}. Post is to {self.api_root}/setWebhook"
f"Setting Telegram webhook URL: {webhook_url}. Post is to {api_root}/setWebhook"
)

response = requests.get(
f"{self.api_root}/setWebhook",
f"{api_root}/setWebhook",
params={
"url": webhook_url,
"allowed_updates": ["message"],
Expand All @@ -74,31 +82,38 @@ def _instance_init(self):

@post("telegram_webhook_info")
def telegram_webhook_info(self) -> dict:
return requests.get(self.api_root + "/getWebhookInfo").json()

@post("connect_telegram")
def connect_telegram(self, bot_token: str):
self.store.set("bot_token", {"token": bot_token})
self.bot_token = bot_token
api_root = self.get_api_root()
if not api_root:
raise SteamshipError(
message="Unable to fetch Telegram API info -- perhaps your bot token isn't set?"
)

try:
self.instance_init()
return "OK"
except Exception as e:
return f"Could not set webhook for bot. Exception: {e}"
return requests.get(api_root + "/getWebhookInfo").json()

@post("telegram_disconnect_webhook")
def telegram_disconnect_webhook(self, *args, **kwargs):
"""Unsubscribe from Telegram updates."""
requests.get(f"{self.api_root}/deleteWebhook")
api_root = self.get_api_root()
if not api_root:
raise SteamshipError(
message="Unable to disconnect from Telegram -- perhaps your bot token isn't set?"
)

requests.get(f"{api_root}/deleteWebhook")

def _send(self, blocks: [Block], metadata: Metadata):
"""Send a response to the Telegram chat."""
api_root = self.get_api_root()
if not api_root:
raise SteamshipError(
message="Unable to send to Telegram -- perhaps your bot token isn't set?"
)

for block in blocks:
chat_id = block.chat_id
if block.is_text() or block.text:
params = {"chat_id": int(chat_id), "text": block.text}
requests.get(f"{self.api_root}/sendMessage", params=params)
requests.get(f"{api_root}/sendMessage", params=params)
elif block.is_image() or block.is_audio() or block.is_video():
if block.is_image():
suffix = "sendPhoto"
Expand All @@ -115,7 +130,7 @@ def _send(self, blocks: [Block], metadata: Metadata):
temp_file.write(_bytes)
temp_file.seek(0)
resp = requests.post(
url=f"{self.api_root}/{suffix}?chat_id={chat_id}",
url=f"{api_root}/{suffix}?chat_id={chat_id}",
files={key: temp_file},
)
if resp.status_code != 200:
Expand All @@ -129,12 +144,16 @@ def _send(self, blocks: [Block], metadata: Metadata):
)

def _get_file(self, file_id: str) -> Dict[str, Any]:
return requests.get(f"{self.api_root}/getFile", params={"file_id": file_id}).json()[
"result"
]
api_root = self.get_api_root()
if not api_root:
raise SteamshipError(
message="Unable to get Telegram file -- perhaps your bot token isn't set?"
)

return requests.get(f"{api_root}/getFile", params={"file_id": file_id}).json()["result"]

def _get_file_url(self, file_id: str) -> str:
return f"https://api.telegram.org/file/bot{self.bot_token}/{self._get_file(file_id)['file_path']}"
return f"https://api.telegram.org/file/bot{self.get_telegram_access_token()}/{self._get_file(file_id)['file_path']}"

def _download_file(self, file_id: str):
result = requests.get(self._get_file_url(file_id))
Expand Down Expand Up @@ -233,3 +252,90 @@ def telegram_respond(self, **kwargs) -> InvocableResponse[str]:
self.send([response])
# Even if we do nothing, make sure we return ok
return InvocableResponse(string="OK")

@post("set_telegram_access_token")
def set_telegram_access_token(self, token: str) -> InvocableResponse[str]:
"""Set the telegram access token."""
existing_token = self.get_telegram_access_token()
if existing_token:
try:
self.telegram_disconnect_webhook()
except BaseException as e:
# Note: we don't want to fully fail here, because that would mean that a bot user that had some
# other error relating to disconnecting would never be able to RE-connect to a new bot.
logging.error(e)

kv = KeyValueStore(
client=self.agent_service.client, store_identifier=self.setting_store_key()
)
kv.set("telegram_token", {"token": token})

# Now attempt to modify the connection in Telegram
self.bot_token = token
try:
self.telegram_connect_webhook()
return InvocableResponse(string="OK")
except Exception as e:
raise SteamshipError(message=f"Could not set Telegram Webhook. Exception: {e}")

def get_api_root(self) -> Optional[str]:
"""Return the API root"""
bot_token = self.get_telegram_access_token()
api_base = self.config.api_base

# Ensure we have an API Base
if not api_base:
raise SteamshipError(message="Missing Telegram API Base")

# Ensure it ends in a trailing slash
if api_base[-1] != "/":
api_base += "/"

if bot_token:
if ".steamship.run/" in api_base:
# This is a special case for our testing pipeline -- it contains a mock Telegram server.
return api_base
else:
return f"{api_base}{bot_token}"
else:
return None

def setting_store_key(self):
return f"{SETTINGS_KVSTORE_KEY}-{self.agent_service.context.invocable_instance_handle}"

def get_telegram_access_token(self) -> Optional[str]:
"""Return the Telegram Access token, which permits the agent to post to Telegram."""

# Warning: This can't be an 'is not None' check since the config system uses an empty string to represent None
if self.bot_token:
return self.bot_token

_dynamically_set_token = None
_fallback_token = None

# Prefer the dynamically set token if available
kv = KeyValueStore(
client=self.agent_service.client, store_identifier=self.setting_store_key()
)
v = kv.get("telegram_token")
if v:
_dynamically_set_token = v.get("token", None)

# Fall back on the config-provided one
if self.config:
_fallback_token = self.config.bot_token

_return_token = _dynamically_set_token or _fallback_token

# Cache it to avoid another KV Store lookup and return
self.bot_token = _return_token
return _return_token

@post("is_telegram_token_set")
def is_telegram_token_set(self) -> InvocableResponse[bool]:
"""Return whether the Telegram token has been set as a way for a remote UI to check status."""

# Warning: This can't be an 'is not None' check since the config system uses an empty string to represent None
if not self.get_telegram_access_token():
return InvocableResponse(json=False)
return InvocableResponse(json=True)
18 changes: 18 additions & 0 deletions tests/assets/demo_package_spec.json
Expand Up @@ -225,6 +225,15 @@
"verb": "GET",
"doc": null
},
{
"args": [],
"className": "TestPackage",
"config": {},
"doc": null,
"path": "/resp_false",
"returns": "<class 'foo.InvocableResponse[dict]'>",
"verb": "POST"
},
{
"returns": "<class 'foo.InvocableResponse[bytes]'>",
"args": [],
Expand Down Expand Up @@ -252,6 +261,15 @@
"verb": "GET",
"doc": null
},
{
"args": [],
"className": "TestPackage",
"config": {},
"doc": null,
"path": "/resp_true",
"returns": "<class 'foo.InvocableResponse[dict]'>",
"verb": "POST"
},
{
"returns": "<class 'foo.InvocableResponse[dict]'>",
"args": [],
Expand Down
8 changes: 8 additions & 0 deletions tests/assets/packages/demo_package.py
Expand Up @@ -48,6 +48,14 @@ def resp_dict(self) -> InvocableResponse[dict]:
def resp_obj(self) -> InvocableResponse[dict]:
return InvocableResponse(json=TestObj(name="Foo"))

@post("resp_true")
def resp_true(self) -> InvocableResponse[dict]:
return InvocableResponse(json=True)

@post("resp_false")
def resp_false(self) -> InvocableResponse[dict]:
return InvocableResponse(json=False)

@get("resp_binary")
def resp_binary(self) -> InvocableResponse[bytes]:
_bytes = base64.b64decode(PALM_TREE_BASE_64)
Expand Down
27 changes: 14 additions & 13 deletions tests/assets/packages/transports/test_transports_agent.py
Expand Up @@ -10,8 +10,8 @@


class TestTransportsAgentConfig(Config):
bot_token: Optional[str] = ""
api_base: Optional[str] = ""
telegram_token: Optional[str] = ""
telegram_api_base: Optional[str] = ""
slack_api_base: Optional[str] = ""


Expand Down Expand Up @@ -78,25 +78,26 @@ def __init__(
# Including the web widget transport on the telegram test
# agent to make sure it doesn't interfere
self.add_mixin(SteamshipWidgetTransport(client=client, agent_service=self))
if self.config.bot_token:
# Only add the mixin if a telegram key was provided.
self.add_mixin(
TelegramTransport(
client=client,
# TODO: We need to rename these telegram_token and telegram_api_base
config=TelegramTransportConfig(
bot_token=self.config.bot_token, api_base=self.config.api_base
),
agent_service=self,
)

# Only add the mixin if a telegram key was provided.
self.add_mixin(
TelegramTransport(
client=client,
config=TelegramTransportConfig(
bot_token=self.config.telegram_token, api_base=self.config.telegram_api_base
),
agent_service=self,
)
)

self.add_mixin(
SlackTransport(
client=client,
config=SlackTransportConfig(slack_api_base=self.config.slack_api_base),
agent_service=self,
)
)

# TODO: Future Transport authors: add your transport here.

@classmethod
Expand Down

0 comments on commit 05a8f0e

Please sign in to comment.