Skip to content

Commit

Permalink
add async to zapier nla tools (#6791)
Browse files Browse the repository at this point in the history
Replace this comment with:
  - Description: Add Async functionality to Zapier NLA Tools
  - Issue:  n/a 
  - Dependencies: n/a
  - Tag maintainer: 

Maintainer responsibilities:
  - Agents / Tools / Toolkits: @vowelparrot
  - Async: @agola11

If no one reviews your PR within a few days, feel free to @-mention the
same people again.

See contribution guidelines for more information on how to write/run
tests, lint, etc:
https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md
  • Loading branch information
mplachter committed Jun 27, 2023
1 parent efe0d39 commit d6664af
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 20 deletions.
17 changes: 17 additions & 0 deletions langchain/agents/agent_toolkits/zapier/toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ def from_zapier_nla_wrapper(
]
return cls(tools=tools)

@classmethod
async def async_from_zapier_nla_wrapper(
cls, zapier_nla_wrapper: ZapierNLAWrapper
) -> "ZapierToolkit":
"""Create a toolkit from a ZapierNLAWrapper."""
actions = await zapier_nla_wrapper.alist()
tools = [
ZapierNLARunAction(
action_id=action["id"],
zapier_description=action["description"],
params_schema=action["params"],
api_wrapper=zapier_nla_wrapper,
)
for action in actions
]
return cls(tools=tools)

def get_tools(self) -> List[BaseTool]:
"""Get the tools in the toolkit."""
return self.tools
10 changes: 7 additions & 3 deletions langchain/tools/zapier/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,15 @@ def _run(

async def _arun(
self,
_: str,
instructions: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the Zapier NLA tool to return a list of all exposed user actions."""
raise NotImplementedError("ZapierNLAListActions does not support async")
return await self.api_wrapper.arun_as_str(
self.action_id,
instructions,
self.params,
)


ZapierNLARunAction.__doc__ = (
Expand Down Expand Up @@ -184,7 +188,7 @@ async def _arun(
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the Zapier NLA tool to return a list of all exposed user actions."""
raise NotImplementedError("ZapierNLAListActions does not support async")
return await self.api_wrapper.alist_as_str()


ZapierNLAListActions.__doc__ = (
Expand Down
135 changes: 118 additions & 17 deletions langchain/utilities/zapier.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
developer support.
"""
import json
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import aiohttp
import requests
from pydantic import BaseModel, Extra, root_validator
from requests import Request, Session
Expand Down Expand Up @@ -49,36 +50,63 @@ class Config:

extra = Extra.forbid

def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(
{
"Accept": "application/json",
"Content-Type": "application/json",
}
)
def _format_headers(self) -> Dict[str, str]:
"""Format headers for requests."""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}

if self.zapier_nla_oauth_access_token:
session.headers.update(
headers.update(
{"Authorization": f"Bearer {self.zapier_nla_oauth_access_token}"}
)
else:
session.params = {"api_key": self.zapier_nla_api_key}
headers.update({"X-API-Key": self.zapier_nla_api_key})

return headers

def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(self._format_headers())
return session

def _get_action_request(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Request:
async def _arequest(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
"""Make an async request."""
async with aiohttp.ClientSession(headers=self._format_headers()) as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()

def _create_action_payload( # type: ignore[no-untyped-def]
self, instructions: str, params: Optional[Dict] = None, preview_only=False
) -> Dict:
"""Create a payload for an action."""
data = params if params else {}
data.update(
{
"instructions": instructions,
}
)
if preview_only:
data.update({"preview_only": True})
return data

def _create_action_url(self, action_id: str) -> str:
"""Create a url for an action."""
return self.zapier_nla_api_base + f"exposed/{action_id}/execute/"

def _create_action_request( # type: ignore[no-untyped-def]
self,
action_id: str,
instructions: str,
params: Optional[Dict] = None,
preview_only=False,
) -> Request:
data = self._create_action_payload(instructions, params, preview_only)
return Request(
"POST",
self.zapier_nla_api_base + f"exposed/{action_id}/execute/",
self._create_action_url(action_id),
json=data,
)

Expand Down Expand Up @@ -107,6 +135,28 @@ def validate_environment(cls, values: Dict) -> Dict:

return values

async def alist(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
actions here: https://nla.zapier.com/demo/start/
The return list can be empty if no actions exposed. Else will contain
a list of action objects:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params` will always contain an `instructions` key, the only required
param. All others optional and if provided will override any AI guesses
(see "understanding the AI guessing flow" here:
https://nla.zapier.com/api/v1/docs)
"""
response = await self._arequest("GET", self.zapier_nla_api_base + "exposed/")
return response["results"]

def list(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
Expand Down Expand Up @@ -157,11 +207,29 @@ def run(
call.
"""
session = self._get_session()
request = self._get_action_request(action_id, instructions, params)
request = self._create_action_request(action_id, instructions, params)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["result"]

async def arun(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Executes an action that is identified by action_id, must be exposed
(enabled) by the current user (associated with the set api_key). Change
your exposed actions here: https://nla.zapier.com/demo/start/
The return JSON is guaranteed to be less than ~500 words (350
tokens) making it safe to inject into the prompt of another LLM
call.
"""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params),
)
return response["result"]

def preview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
Expand All @@ -171,25 +239,58 @@ def preview(
session = self._get_session()
params = params if params else {}
params.update({"preview_only": True})
request = self._get_action_request(action_id, instructions, params)
request = self._create_action_request(action_id, instructions, params, True)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["input_params"]

async def apreview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Same as run, but instead of actually executing the action, will
instead return a preview of params that have been guessed by the AI in
case you need to explicitly review before executing."""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params, preview_only=True),
)
return response["result"]

def run_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.run(*args, **kwargs)
return json.dumps(data)

async def arun_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.arun(*args, **kwargs)
return json.dumps(data)

def preview_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.preview(*args, **kwargs)
return json.dumps(data)

async def apreview_as_str( # type: ignore[no-untyped-def]
self, *args, **kwargs
) -> str:
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.apreview(*args, **kwargs)
return json.dumps(data)

def list_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = self.list()
return json.dumps(actions)

async def alist_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = await self.alist()
return json.dumps(actions)
Loading

0 comments on commit d6664af

Please sign in to comment.