Skip to content

Commit

Permalink
Add webhook endpoint, and other fixes (#1766)
Browse files Browse the repository at this point in the history
* Update FastAPI app configuration in main.py

* Add webhook endpoint

* Add Observer model and related changes to database models

* Update refresh button text in ParameterComponent

* Update database models and main.py, and add webhook endpoint and utils for flow model

* Add WebhookComponent to langflow data components

* Update webhook component names in flow data and remove unused code in edge/base.py and graph/base.py

* 🐛 (endpoints.py): fix an issue where the user_id was not being correctly assigned based on the api_key_user or flow object

* 📝 (endpoints.py): Update documentation for the webhook_run_flow endpoint to provide clearer information on its purpose, arguments, and return value. Remove outdated request JSON structure and example request sections. Add information on the arguments and exceptions raised for better clarity.

* chore: Remove unnecessary trailing commas in code files

* feat: Add yield statement to get_lifespan function

* feat: Update webhook columns revision ID

* Refactor import statements for OpenAI conversational agent

* refactor(constants.ts): remove unused tabsArray function and update function name to createTabsArray for clarity
feat(constants.ts): add support for including webhook code in tabs array based on flow webhook property
feat(index.tsx): update function calls to use new createTabsArray function and include webhook code in tabs array based on flow webhook property

refactor(utils.ts): remove trailing commas in function parameters to improve code consistency and readability
feat(utils.ts): add support for including webhook cURL code and tweaks code in tabs array creation for better code organization and presentation

* chore: Update folder_id parameter type to str in API endpoints

* feat: Handle invalid JSON payload in WebhookComponent

The code changes in `Webhook.py` modify the `build` method of the `WebhookComponent` class. If an invalid JSON payload is provided, instead of raising a `ValueError`, the code now handles the exception by creating a default payload with the provided data. The commit also includes a message that describes the invalid JSON payload for better error tracking.

Note: This commit message follows the convention used in the recent user commits.

* refactor: Remove unnecessary code in main.py

* refactor: Update Alembic revision IDs for webhook columns

The Alembic revision IDs for adding webhook columns have been updated to reflect the latest changes. This ensures consistency and proper tracking of the database schema modifications.

Note: The commit message has been generated based on the provided code changes and recent commits.

* 🐛 (endpoints.py): Fix handling of flow_id_or_name variable to correctly identify flow ID or endpoint name for running a flow. Improve error handling for invalid UUID strings and flow not found scenarios.

* refactor: Handle invalid JSON payload in WebhookComponent

The code changes in `Webhook.py` modify the `build` method of the `WebhookComponent` class. If an invalid JSON payload is provided, instead of raising a `ValueError`, the code now handles the exception by creating a default payload with the provided data.

* refactor: Update get_flow_by_id function to handle None flow_id parameter

The get_flow_by_id function in utils.py has been updated to handle the case where the flow_id parameter is None. Previously, the function would raise an exception if flow_id was not provided. Now, if flow_id is None, the function returns None instead of querying the database. This change improves the robustness of the function and allows for more flexible usage.

Note: The commit message has been generated based on the provided code changes and recent commits.

* 📝 (endpoints.py): Add import statement for get_flow_by_id_or_endpoint_name function
📝 (flow.py): Add get_flow_by_id_or_endpoint_name function to retrieve a Flow object by ID or endpoint name

* refactor: Update get_flow_by_id_or_endpoint_name function to handle None flow_id parameter

* 🐛 (flow.py): replace ValueError with HTTPException to return a 404 status code when flow identifier is not found

* refactor: Update error message for flow identifier not found

The code changes in `test_endpoints.py` update the error message when a flow identifier is not found. Previously, the error message referenced the flow ID directly, but now it uses the more generic term "Flow identifier" to account for cases where the identifier could be an endpoint name. This change improves the clarity and flexibility of the error message.

Note: This commit message has been generated based on the provided code changes and recent commits.

* 🔧 (endpoints.py): remove unnecessary flow_id_or_name parameter from the simple_run_flow function call to improve code readability and maintainability
  • Loading branch information
ogabrielluiz committed May 31, 2024
1 parent 1f8d0eb commit 5e2f4b8
Show file tree
Hide file tree
Showing 41 changed files with 467 additions and 294 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Add webhook columns
Revision ID: 631faacf5da2
Revises: 1c79524817ed
Create Date: 2024-04-22 15:14:43.454784
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision: str = "631faacf5da2"
down_revision: Union[str, None] = "1c79524817ed"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "flow" in table_names and "webhook" not in column_names:
batch_op.add_column(sa.Column("webhook", sa.Boolean(), nullable=True))

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "flow" in table_names and "webhook" in column_names:
batch_op.drop_column("webhook")

# ### end Alembic commands ###
220 changes: 141 additions & 79 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from uuid import UUID

import sqlalchemy as sa
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status
from loguru import logger
from sqlmodel import Session, select

Expand All @@ -22,11 +22,14 @@
from langflow.custom import CustomComponent
from langflow.custom.utils import build_custom_component_template
from langflow.graph.graph.base import Graph
from langflow.graph.schema import RunOutputs
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
from langflow.processing.process import process_tweaks, run_graph_internal
from langflow.schema.graph import Tweaks
from langflow.services.auth.utils import api_key_security, get_current_active_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow, get_flow_by_id
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service
from langflow.services.session.service import SessionService
Expand All @@ -53,10 +56,75 @@ def get_all(
raise HTTPException(status_code=500, detail=str(exc)) from exc


async def simple_run_flow(
db: Session,
flow: Flow,
input_request: SimplifiedAPIRequest,
session_service: SessionService,
stream: bool = False,
api_key_user: Optional[User] = None,
):
try:
task_result: List[RunOutputs] = []
artifacts = {}
user_id = api_key_user.id if api_key_user else None
flow_id_str = str(flow.id)
if input_request.session_id:
session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id_str)
graph, artifacts = session_data if session_data else (None, None)
if graph is None:
raise ValueError(f"Session {input_request.session_id} not found")
else:
if flow.data is None:
raise ValueError(f"Flow {flow_id_str} has no data")
graph_data = flow.data
graph_data = process_tweaks(graph_data, input_request.tweaks or {})
graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(user_id))
inputs = [
InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type)
]
# outputs is a list of all components that should return output
# we need to get them by checking their type
# if the output type is debug, we return all outputs
# if the output type is any, we return all outputs that are either chat or text
# if the output type is chat or text, we return only the outputs that match the type
if input_request.output_component:
outputs = [input_request.output_component]
else:
outputs = [
vertex.id
for vertex in graph.vertices
if input_request.output_type == "debug"
or (
vertex.is_output
and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower())
)
]
task_result, session_id = await run_graph_internal(
graph=graph,
flow_id=flow_id_str,
session_id=input_request.session_id,
inputs=inputs,
outputs=outputs,
artifacts=artifacts,
session_service=session_service,
stream=stream,
)

return RunResponse(outputs=task_result, session_id=session_id)

except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
logger.error(f"Flow ID {flow_id_str} is not a valid UUID")
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise ValueError(str(exc)) from exc


@router.post("/run/{flow_id_or_name}", response_model=RunResponse, response_model_exclude_none=True)
async def simplified_run_flow(
db: Annotated[Session, Depends(get_session)],
flow_id_or_name: str,
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: User = Depends(api_key_security),
Expand All @@ -67,7 +135,7 @@ async def simplified_run_flow(
### Parameters:
- `db` (Session): Database session for executing queries.
- `flow_id` (str): Unique identifier of the flow to be executed.
- `flow_id_or_name` (str): ID or endpoint name of the flow to run.
- `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks, and session ID.
- `api_key_user` (User): User object derived from the provided API key, used for authentication.
- `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching.
Expand Down Expand Up @@ -110,89 +178,21 @@ async def simplified_run_flow(
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching.
"""
session_id = input_request.session_id
endpoint_name = None
flow_id_str = None
try:
try:
flow_id = UUID(flow_id_or_name)

except ValueError:
endpoint_name = flow_id_or_name
flow = db.exec(
select(Flow).where(Flow.endpoint_name == endpoint_name).where(Flow.user_id == api_key_user.id)
).first()
if flow is None:
raise ValueError(f"Flow with endpoint name {endpoint_name} not found")
flow_id = flow.id

flow_id_str = str(flow_id)
artifacts = {}
if input_request.session_id:
session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id_str)
graph, artifacts = session_data if session_data else (None, None)
if graph is None:
raise ValueError(f"Session {input_request.session_id} not found")
else:
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
flow = db.exec(select(Flow).where(Flow.id == flow_id_str).where(Flow.user_id == api_key_user.id)).first()
if flow is None:
raise ValueError(f"Flow {flow_id_str} not found")

if flow.data is None:
raise ValueError(f"Flow {flow_id_str} has no data")
graph_data = flow.data

graph_data = process_tweaks(graph_data, input_request.tweaks or {}, stream=stream)
graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(api_key_user.id))
inputs = [
InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type)
]
# outputs is a list of all components that should return output
# we need to get them by checking their type
# if the output type is debug, we return all outputs
# if the output type is any, we return all outputs that are either chat or text
# if the output type is chat or text, we return only the outputs that match the type
if input_request.output_component:
outputs = [input_request.output_component]
else:
outputs = [
vertex.id
for vertex in graph.vertices
if input_request.output_type == "debug"
or (
vertex.is_output
and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower())
)
]
task_result, session_id = await run_graph_internal(
graph=graph,
flow_id=flow_id_str,
session_id=input_request.session_id,
inputs=inputs,
outputs=outputs,
artifacts=artifacts,
return await simple_run_flow(
db=db,
flow=flow,
input_request=input_request,
session_service=session_service,
stream=stream,
api_key_user=api_key_user,
)

return RunResponse(outputs=task_result, session_id=session_id)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
except ValueError as exc:
if "badly formed hexadecimal UUID string" in str(exc):
logger.error(f"Flow ID {flow_id_str} is not a valid UUID")
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if flow_id_str and f"Flow {flow_id_str} not found" in str(exc):
logger.error(f"Flow {flow_id_str} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif endpoint_name and f"Flow with endpoint name {endpoint_name} not found" in str(exc):
logger.error(f"Flow with endpoint name {endpoint_name} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif session_id and f"Session {session_id} not found" in str(exc):
logger.error(f"Session {session_id} not found")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
if "not found" in str(exc):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
else:
logger.exception(exc)
Expand All @@ -202,6 +202,68 @@ async def simplified_run_flow(
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc


@router.post("/webhook/{flow_id}", response_model=dict, status_code=HTTPStatus.ACCEPTED)
async def webhook_run_flow(
db: Annotated[Session, Depends(get_session)],
flow: Annotated[Flow, Depends(get_flow_by_id)],
request: Request,
background_tasks: BackgroundTasks,
session_service: SessionService = Depends(get_session_service),
):
"""
Run a flow using a webhook request.
Args:
db (Session): The database session.
request (Request): The incoming HTTP request.
background_tasks (BackgroundTasks): The background tasks manager.
session_service (SessionService, optional): The session service. Defaults to Depends(get_session_service).
flow (Flow, optional): The flow to be executed. Defaults to Depends(get_flow_by_id).
Returns:
dict: A dictionary containing the status of the task.
Raises:
HTTPException: If the flow is not found or if there is an error processing the request.
"""
try:
logger.debug("Received webhook request")
data = await request.body()
if not data:
logger.error("Request body is empty")
raise ValueError(
"Request body is empty. You should provide a JSON payload containing the flow ID.",
)

# get all webhook components in the flow
webhook_components = get_all_webhook_components_in_flow(flow.data)
tweaks = {}
data_dict = await request.json()
for component in webhook_components:
tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data}
input_request = SimplifiedAPIRequest(
input_value=data_dict.get("input_value", ""),
input_type=data_dict.get("input_type", "chat"),
output_type=data_dict.get("output_type", "chat"),
tweaks=tweaks,
session_id=data_dict.get("session_id"),
)
logger.debug("Starting background task")
background_tasks.add_task(
simple_run_flow,
db=db,
flow=flow,
input_request=input_request,
session_service=session_service,
)
return {"message": "Task started in the background", "status": "in progress"}
except Exception as exc:
if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc):
raise HTTPException(status_code=400, detail=str(exc)) from exc
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc


@router.post("/run/advanced/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
async def experimental_run_flow(
session: Annotated[Session, Depends(get_session)],
Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
from langflow.services.database.models.folder.model import Folder
from langflow.services.database.models.user.model import User
Expand Down Expand Up @@ -150,6 +151,8 @@ def update_flow(
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
webhook_component = get_webhook_component_in_flow(db_flow.data)
db_flow.webhook = webhook_component is not None
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
Expand Down
9 changes: 4 additions & 5 deletions src/backend/base/langflow/api/v1/folders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import List
from uuid import UUID

import orjson
from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status
Expand Down Expand Up @@ -88,7 +87,7 @@ def read_folders(
def read_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
folder_id: str,
current_user: User = Depends(get_current_active_user),
):
try:
Expand All @@ -106,7 +105,7 @@ def read_folder(
def update_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
folder_id: str,
folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields
current_user: User = Depends(get_current_active_user),
):
Expand Down Expand Up @@ -155,7 +154,7 @@ def update_folder(
def delete_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
folder_id: str,
current_user: User = Depends(get_current_active_user),
):
try:
Expand All @@ -177,7 +176,7 @@ def delete_folder(
async def download_file(
*,
session: Session = Depends(get_session),
folder_id: UUID,
folder_id: str,
current_user: User = Depends(get_current_active_user),
):
"""Download all flows from folder."""
Expand Down
Loading

0 comments on commit 5e2f4b8

Please sign in to comment.