Skip to content

Commit

Permalink
Add support for running flows by endpoint name (#2012)
Browse files Browse the repository at this point in the history
* feat: Add support for running flows by endpoint name

This commit modifies the `simplified_run_flow` endpoint in `endpoints.py` to allow running flows using the endpoint name instead of the flow ID. It introduces a new route parameter `flow_id_or_name` which can accept either a UUID or a string representing the endpoint name. The code first attempts to parse the parameter as a UUID, and if that fails, it queries the database to find a flow with the matching endpoint name. This change improves the usability of the API by providing an alternative way to identify flows for execution.

* feat: Add endpoint_name field to FlowType

This commit adds the `endpoint_name` field to the `FlowType` interface in the `index.ts` file. The `endpoint_name` field is an optional string that represents the name of the endpoint associated with the flow. This change allows for more flexibility in identifying flows by endpoint name instead of just the flow ID. It improves the usability of the codebase by providing an alternative way to reference flows.

* 🐛 (endpoints.py): change type of flow_id_or_name parameter from Union[str, UUID] to str to simplify the API and improve readability

* feat: Add migration utility functions for table, column, foreign key, and constraint existence checks

This commit adds utility functions to the `migration.py` module in the `langflow.utils` package. These functions provide convenient ways to check the existence of tables, columns, foreign keys, and constraints in a database using SQLAlchemy. The functions `table_exists`, `column_exists`, `foreign_key_exists`, and `constraint_exists` take the table name, column name, foreign key name, and constraint name respectively, along with the SQLAlchemy engine or connection object. They use the `Inspector` class from `sqlalchemy.engine.reflection` to retrieve the necessary information and return a boolean value indicating whether the specified element exists in the database. These utility functions improve the readability and maintainability of the codebase by encapsulating the common existence checks in a reusable and modular way.

* feat: Add unique constraints for per-user folders and flows

This commit adds unique constraints for per-user folders and flows in the database. It introduces the `unique_folder_name` constraint for the `folder` table, ensuring that each user can have only one folder with a specific name. Similarly, it adds the `unique_flow_endpoint_name` and `unique_flow_name` constraints for the `flow` table, enforcing uniqueness of endpoint names and flow names per user. These constraints improve data integrity and prevent duplicate entries in the database, providing a more robust and reliable system.

* feat: Add poetry installation and caching steps to GitHub Actions workflow

This commit updates the GitHub Actions workflow file `action.yml` to include additional steps for installing poetry and caching its dependencies. The `run` step now installs poetry using the specified version and ensures that the poetry binary is available in the PATH. Additionally, the workflow now includes a step to restore pip and poetry cached dependencies using the `actions/cache` action. These changes improve the workflow by providing a more efficient and reliable way to manage poetry dependencies and caching.

* refactor: Improve error handling in update_flow function

This commit improves the error handling in the `update_flow` function in `flows.py`. It adds a new `elif` condition to check if the exception is an instance of `HTTPException` and re-raises it. This ensures that any `HTTPException` raised during the update process is properly handled and returned as a response. Additionally, it removes the unnecessary `else` block and simplifies the code logic. This refactor enhances the reliability and maintainability of the `update_flow` function.
  • Loading branch information
ogabrielluiz committed May 30, 2024
1 parent 9d2fe80 commit 94e8bf1
Show file tree
Hide file tree
Showing 22 changed files with 504 additions and 136 deletions.
7 changes: 6 additions & 1 deletion .github/actions/poetry_caching/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ runs:
POETRY_VERSION: ${{ inputs.poetry-version }}
PYTHON_VERSION: ${{ inputs.python-version }}
# Install poetry using the python version installed by setup-python step.
run: pipx install "poetry==$POETRY_VERSION" --python '${{ steps.setup-python.outputs.python-path }}' --verbose
run: |
pipx install "poetry==$POETRY_VERSION" --python '${{ steps.setup-python.outputs.python-path }}' --verbose
pipx ensurepath
# Ensure the poetry binary is available in the PATH.
# Test that the poetry binary is available.
poetry --version
- name: Restore pip and poetry cached dependencies
uses: actions/cache@v4
Expand Down
5 changes: 1 addition & 4 deletions src/backend/base/langflow/alembic/script.py.mako
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ from alembic import op
import sqlalchemy as sa
import sqlmodel
from sqlalchemy.engine.reflection import Inspector
from langflow.utils import migration
${imports if imports else ""}

# revision identifiers, used by Alembic.
Expand All @@ -22,13 +23,9 @@ depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}

def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
${upgrades if upgrades else "pass"}


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
${downgrades if downgrades else "pass"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Add unique constraints per user in folder table
Revision ID: 1c79524817ed
Revises: 3bb0ddf32dfb
Create Date: 2024-05-29 23:12:09.146880
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision: str = "1c79524817ed"
down_revision: Union[str, None] = "3bb0ddf32dfb"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("folder")]
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("folder", schema=None) as batch_op:
if "unique_folder_name" not in constraints_names:
batch_op.create_unique_constraint("unique_folder_name", ["user_id", "name"])

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("folder")]
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("folder", schema=None) as batch_op:
if "unique_folder_name" in constraints_names:
batch_op.drop_constraint("unique_folder_name", type_="unique")

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Add unique constraints per user in flow table
Revision ID: 3bb0ddf32dfb
Revises: a72f5cf9c2f9
Create Date: 2024-05-29 23:08:43.935040
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision: str = "3bb0ddf32dfb"
down_revision: Union[str, None] = "a72f5cf9c2f9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes_names = [index["name"] for index in inspector.get_indexes("flow")]
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "ix_flow_endpoint_name" in indexes_names:
batch_op.drop_index("ix_flow_endpoint_name")
batch_op.create_index(batch_op.f("ix_flow_endpoint_name"), ["endpoint_name"], unique=False)
if "unique_flow_endpoint_name" not in constraints_names:
batch_op.create_unique_constraint("unique_flow_endpoint_name", ["user_id", "endpoint_name"])
if "unique_flow_name" not in constraints_names:
batch_op.create_unique_constraint("unique_flow_name", ["user_id", "name"])

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes_names = [index["name"] for index in inspector.get_indexes("flow")]
constraints_names = [constraint["name"] for constraint in inspector.get_unique_constraints("flow")]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "unique_flow_name" in constraints_names:
batch_op.drop_constraint("unique_flow_name", type_="unique")
if "unique_flow_endpoint_name" in constraints_names:
batch_op.drop_constraint("unique_flow_endpoint_name", type_="unique")
if "ix_flow_endpoint_name" in indexes_names:
batch_op.drop_index(batch_op.f("ix_flow_endpoint_name"))
batch_op.create_index("ix_flow_endpoint_name", ["endpoint_name"], unique=1)

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Add endpoint name col
Revision ID: a72f5cf9c2f9
Revises: 29fe8f1f806b
Create Date: 2024-05-29 21:44:04.240816
"""

from typing import Sequence, Union

import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision: str = "a72f5cf9c2f9"
down_revision: Union[str, None] = "29fe8f1f806b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
indexes = inspector.get_indexes("flow")
index_names = [index["name"] for index in indexes]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "endpoint_name" not in column_names:
batch_op.add_column(sa.Column("endpoint_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True))
if "ix_flow_endpoint_name" not in index_names:
batch_op.create_index(batch_op.f("ix_flow_endpoint_name"), ["endpoint_name"], unique=True)

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
column_names = [column["name"] for column in inspector.get_columns("flow")]
indexes = inspector.get_indexes("flow")
index_names = [index["name"] for index in indexes]
with op.batch_alter_table("flow", schema=None) as batch_op:
if "ix_flow_endpoint_name" in index_names:
batch_op.drop_index(batch_op.f("ix_flow_endpoint_name"))
if "endpoint_name" in column_names:
batch_op.drop_column("endpoint_name")

# ### end Alembic commands ###
26 changes: 21 additions & 5 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ def get_all(
raise HTTPException(status_code=500, detail=str(exc)) from exc


@router.post("/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
@router.post("/run/{flow_id_or_name}", response_model=RunResponse, response_model_exclude_none=True)
async def simplified_run_flow(
db: Annotated[Session, Depends(get_session)],
flow_id: UUID,
flow_id_or_name: str,
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: User = Depends(api_key_security),
Expand Down Expand Up @@ -111,8 +111,21 @@ async def simplified_run_flow(
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching.
"""
session_id = input_request.session_id

endpoint_name = None
flow_id_str = None
try:
try:
flow_id = UUID(flow_id_or_name)

except ValueError:
endpoint_name = flow_id_or_name
flow = db.exec(
select(Flow).where(Flow.endpoint_name == endpoint_name).where(Flow.user_id == api_key_user.id)
).first()
if flow is None:
raise ValueError(f"Flow with endpoint name {endpoint_name} not found")
flow_id = flow.id

flow_id_str = str(flow_id)
artifacts = {}
if input_request.session_id:
Expand Down Expand Up @@ -172,10 +185,13 @@ async def simplified_run_flow(
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if f"Flow {flow_id_str} not found" in str(exc):
if flow_id_str and f"Flow {flow_id_str} not found" in str(exc):
logger.error(f"Flow {flow_id_str} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif f"Session {session_id} not found" in str(exc):
elif endpoint_name and f"Flow with endpoint name {endpoint_name} not found" in str(exc):
logger.error(f"Flow with endpoint name {endpoint_name} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
elif session_id and f"Session {session_id} not found" in str(exc):
logger.error(f"Session {session_id} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
else:
Expand Down
67 changes: 43 additions & 24 deletions src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,49 @@ def update_flow(
settings_service=Depends(get_settings_service),
):
"""Update a flow."""

db_flow = read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
settings_service=settings_service,
)
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.model_dump(exclude_unset=True)
if settings_service.settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
try:
db_flow = read_flow(
session=session,
flow_id=flow_id,
current_user=current_user,
settings_service=settings_service,
)
if not db_flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow_data = flow.model_dump(exclude_unset=True)
if settings_service.settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
return db_flow
except Exception as e:
# If it is a validation error, return the error message
if hasattr(e, "errors"):
raise HTTPException(status_code=400, detail=str(e)) from e
elif "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0]
# UNIQUE constraint failed: flow.user_id, flow.name
# or UNIQUE constraint failed: flow.name
# if the column has id in it, we want the other column
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0]

raise HTTPException(
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique"
) from e
elif isinstance(e, HTTPException):
raise e
else:
raise HTTPException(status_code=500, detail=str(e)) from e


@router.delete("/{flow_id}", status_code=200)
Expand Down
44 changes: 44 additions & 0 deletions src/backend/base/langflow/services/database/models/flow/model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Path: src/backend/langflow/services/database/models/flow/model.py

import re
import warnings
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, Optional
from uuid import UUID, uuid4

import emoji
from emoji import purely_emoji # type: ignore
from fastapi import HTTPException, status
from pydantic import field_serializer, field_validator
from sqlalchemy import UniqueConstraint
from sqlmodel import JSON, Column, Field, Relationship, SQLModel

from langflow.schema.schema import Record
Expand All @@ -26,6 +29,24 @@ class FlowBase(SQLModel):
is_component: Optional[bool] = Field(default=False, nullable=True)
updated_at: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=True)
folder_id: Optional[UUID] = Field(default=None, nullable=True)
endpoint_name: Optional[str] = Field(default=None, nullable=True, index=True)

@field_validator("endpoint_name")
@classmethod
def validate_endpoint_name(cls, v):
# Endpoint name must be a string containing only letters, numbers, hyphens, and underscores
if v is not None:
if not isinstance(v, str):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must be a string",
)
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must contain only letters, numbers, hyphens, and underscores",
)
return v

@field_validator("icon_bg_color")
def validate_icon_bg_color(cls, v):
Expand Down Expand Up @@ -128,6 +149,11 @@ def to_record(self):
record = Record(data=data)
return record

__table_args__ = (
UniqueConstraint("user_id", "name", name="unique_flow_name"),
UniqueConstraint("user_id", "endpoint_name", name="unique_flow_endpoint_name"),
)


class FlowCreate(FlowBase):
user_id: Optional[UUID] = None
Expand All @@ -145,3 +171,21 @@ class FlowUpdate(SQLModel):
description: Optional[str] = None
data: Optional[Dict] = None
folder_id: Optional[UUID] = None
endpoint_name: Optional[str] = None

@field_validator("endpoint_name")
@classmethod
def validate_endpoint_name(cls, v):
# Endpoint name must be a string containing only letters, numbers, hyphens, and underscores
if v is not None:
if not isinstance(v, str):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must be a string",
)
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Endpoint name must contain only letters, numbers, hyphens, and underscores",
)
return v
Loading

0 comments on commit 94e8bf1

Please sign in to comment.