Skip to content

Commit

Permalink
feat: migrate vertex_builds to sql database
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Jul 29, 2024
1 parent 77786d9 commit d3985f3
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""create vertex_builds table
Revision ID: 0d60fcbd4e8e
Revises: 90be8e2ed91e
Create Date: 2024-07-26 11:41:31.274271
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import sqlmodel
from sqlalchemy.engine.reflection import Inspector

Check failure on line 13 in src/backend/base/langflow/alembic/versions/0d60fcbd4e8e_create_vertex_builds_table.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F401)

src/backend/base/langflow/alembic/versions/0d60fcbd4e8e_create_vertex_builds_table.py:13:42: F401 `sqlalchemy.engine.reflection.Inspector` imported but unused
from langflow.utils import migration


# revision identifiers, used by Alembic.
revision: str = '0d60fcbd4e8e'
down_revision: Union[str, None] = '90be8e2ed91e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
if not migration.table_exists("vertex_build", conn):
op.create_table(
"vertex_build",
sa.Column("timestamp", sa.DateTime(), nullable=False),
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("data", sa.JSON(), nullable=True),
sa.Column("artifacts", sa.JSON(), nullable=True),
sa.Column("params", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("build_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("flow_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("valid", sqlmodel.sql.sqltypes.BOOLEAN(), nullable=False),
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
),
sa.PrimaryKeyConstraint("build_id"),
)
pass


def downgrade() -> None:
conn = op.get_bind()
if migration.table_exists("vertex_build", conn):
op.drop_table("vertex_build")
pass
4 changes: 2 additions & 2 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
)
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.graph.base import Graph
from langflow.graph.utils import log_vertex_build
from langflow.schema.schema import OutputValue
from langflow.services.auth.utils import get_current_active_user
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service, get_session, get_session_service, get_telemetry_service
from langflow.services.monitor.utils import log_vertex_build
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
from langflow.services.telemetry.service import TelemetryService

Expand Down Expand Up @@ -233,7 +233,7 @@ async def build_vertex(
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id_str,
vertex_id=vertex_id.split("-")[0],
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_data_response,
Expand Down
27 changes: 11 additions & 16 deletions src/backend/base/langflow/api/v1/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,34 @@
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.models.transactions.model import TransactionReadResponse
from langflow.services.database.models.user.model import User
from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_flow_id, \
delete_vertex_builds_by_flow_id
from langflow.services.database.models.vertex_builds.model import VertexBuildMapModel
from langflow.services.deps import get_monitor_service, get_session

Check failure on line 16 in src/backend/base/langflow/api/v1/monitor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F401)

src/backend/base/langflow/api/v1/monitor.py:16:36: F401 `langflow.services.deps.get_monitor_service` imported but unused
from langflow.services.monitor.schema import MessageModelResponse, VertexBuildMapModel
from langflow.services.monitor.service import MonitorService
from langflow.services.monitor.schema import MessageModelResponse

router = APIRouter(prefix="/monitor", tags=["Monitor"])


# Get vertex_builds data from the monitor service
@router.get("/builds", response_model=VertexBuildMapModel)
async def get_vertex_builds(
flow_id: Optional[str] = Query(None),
vertex_id: Optional[str] = Query(None),
valid: Optional[bool] = Query(None),
order_by: Optional[str] = Query("timestamp"),
monitor_service: MonitorService = Depends(get_monitor_service),
flow_id: UUID = Query(),
session: Session = Depends(get_session),
):
try:
vertex_build_dicts = monitor_service.get_vertex_builds(
flow_id=flow_id, vertex_id=vertex_id, valid=valid, order_by=order_by
)
vertex_build_map = VertexBuildMapModel.from_list_of_dicts(vertex_build_dicts)
return vertex_build_map
vertex_builds = get_vertex_builds_by_flow_id(session, flow_id)
return VertexBuildMapModel.from_list_of_dicts(vertex_builds)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.delete("/builds", status_code=204)
async def delete_vertex_builds(
flow_id: Optional[str] = Query(None),
monitor_service: MonitorService = Depends(get_monitor_service),
flow_id: UUID = Query(),
session: Session = Depends(get_session),
):
try:
monitor_service.delete_vertex_builds(flow_id=flow_id)
delete_vertex_builds_by_flow_id(session, flow_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

Expand Down
8 changes: 0 additions & 8 deletions src/backend/base/langflow/graph/edge/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pydantic import BaseModel, Field, field_validator

from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.monitor.utils import log_message

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
Expand Down Expand Up @@ -224,13 +223,6 @@ async def get_result_from_source(self, source: "Vertex", target: "Vertex"):
):
if target.params.get("message") == "":
return self.result
await log_message(
sender=target.params.get("sender", ""),
sender_name=target.params.get("sender_name", ""),
message=target.params.get(INPUT_FIELD_NAME, {}),
session_id=target.params.get("session_id", ""),
flow_id=target.graph.flow_id,
)
return self.result

def __repr__(self) -> str:
Expand Down
27 changes: 27 additions & 0 deletions src/backend/base/langflow/graph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from langflow.schema.message import Message
from langflow.services.database.models.transactions.model import TransactionBase
from langflow.services.database.models.transactions.crud import log_transaction as crud_log_transaction
from langflow.services.database.models.vertex_builds.crud import log_vertex_build as crud_log_vertex_build
from langflow.services.database.models.vertex_builds.model import VertexBuildBase
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from loguru import logger
Expand Down Expand Up @@ -145,3 +147,28 @@ async def log_transaction(
logger.debug(f"Logged transaction: {inserted.id}")
except Exception as e:
logger.error(f"Error logging transaction: {e}")


def log_vertex_build(
flow_id: str,
vertex_id: str,
valid: bool,
params: Any,
data: "ResultDataResponse",

Check failure on line 157 in src/backend/base/langflow/graph/utils.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F821)

src/backend/base/langflow/graph/utils.py:157:12: F821 Undefined name `ResultDataResponse`
artifacts: Optional[dict] = None,
):
try:
vertex_build = VertexBuildBase(
flow_id=flow_id,
id=vertex_id,
valid=valid,
params=str(params) if params else None,
# ugly hack to get the model dump with weird datatypes
data=json.loads(data.model_dump_json()),
artifacts=artifacts,
)
with session_getter(get_db_service()) as session:
inserted = crud_log_vertex_build(session, vertex_build)
logger.debug(f"Logged vertex build: {inserted.build_id}")
except Exception as e:
logger.exception(f"Error logging vertex build: {e}")
7 changes: 3 additions & 4 deletions src/backend/base/langflow/graph/vertex/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
from loguru import logger

from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, serialize_field, log_transaction
from langflow.graph.utils import UnbuiltObject, serialize_field, log_transaction, log_vertex_build
from langflow.graph.vertex.base import Vertex
from langflow.schema import Data
from langflow.schema.artifact import ArtifactType
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.monitor.utils import log_vertex_build
from langflow.template.field.base import UNDEFINED
from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse
from langflow.utils.util import unescape_string
Expand Down Expand Up @@ -389,14 +388,14 @@ async def stream(self):
if isinstance(value, (AsyncIterator, Iterator)):
origin_vertex.results[key] = complete_message

await log_vertex_build(
asyncio.create_task(log_vertex_build(
flow_id=self.graph.flow_id,
vertex_id=self.id,
valid=True,
params=self._built_object_repr(),
data=self.result,
artifacts=self.artifacts,
)
))

self._validate_built_object()
self._built = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlmodel import JSON, Column, Field, Relationship, SQLModel

from langflow.schema import Data
from langflow.services.database.models.vertex_builds.model import VertexBuildTable

if TYPE_CHECKING:
from langflow.services.database.models.folder import Folder
Expand Down Expand Up @@ -145,6 +146,7 @@ class Flow(FlowBase, table=True):
folder: Optional["Folder"] = Relationship(back_populates="flows")
messages: List["MessageTable"] = Relationship(back_populates="flow")
transactions: List["TransactionTable"] = Relationship(back_populates="flow")
vertex_builds: List["VertexBuildTable"] = Relationship(back_populates="flow")

def to_data(self):
serialized = self.model_dump()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .model import VertexBuildTable

__all__ = ["VertexBuildTable"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Optional
from uuid import UUID

from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, select
from sqlalchemy import delete

from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable

Check failure on line 8 in src/backend/base/langflow/services/database/models/vertex_builds/crud.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F401)

src/backend/base/langflow/services/database/models/vertex_builds/crud.py:8:66: F401 `langflow.services.database.models.transactions.model.TransactionBase` imported but unused

Check failure on line 8 in src/backend/base/langflow/services/database/models/vertex_builds/crud.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F401)

src/backend/base/langflow/services/database/models/vertex_builds/crud.py:8:83: F401 `langflow.services.database.models.transactions.model.TransactionTable` imported but unused
from langflow.services.database.models.vertex_builds.model import VertexBuildBase, VertexBuildTable


def get_vertex_builds_by_flow_id(db: Session, flow_id: UUID, limit: Optional[int] = 1000) -> list[VertexBuildTable]:
stmt = (
select(VertexBuildTable)
.where(VertexBuildTable.flow_id == flow_id)
.order_by(VertexBuildTable.timestamp)
.limit(limit)
)

builds = db.exec(stmt)
return [t for t in builds]


def log_vertex_build(db: Session, vertex_build: VertexBuildBase) -> VertexBuildTable:
table = VertexBuildTable(**vertex_build.model_dump())
db.add(table)
try:
db.commit()
return table
except IntegrityError as e:
db.rollback()
raise e


def delete_vertex_builds_by_flow_id(db: Session, flow_id: UUID) -> None:
delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow_id)
db.commit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from uuid import UUID, uuid4

from pydantic import field_validator, BaseModel
from sqlmodel import JSON, Column, Field, Relationship, SQLModel


if TYPE_CHECKING:
from langflow.services.database.models.flow.model import Flow



class VertexBuildBase(SQLModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
id: str = Field(nullable=False)
data: Optional[dict] = Field(default=None, sa_column=Column(JSON))
artifacts: Optional[dict] = Field(default=None, sa_column=Column(JSON))
params: Optional[str] = Field(nullable=True)
valid: bool = Field(nullable=False)
flow_id: UUID = Field(foreign_key="flow.id")

# Needed for Column(JSON)
class Config:
arbitrary_types_allowed = True

@field_validator("flow_id", mode="before")
@classmethod
def validate_flow_id(cls, value):
if value is None:
return value
if isinstance(value, str):
value = UUID(value)
return value


class VertexBuildTable(VertexBuildBase, table=True):
__tablename__ = "vertex_build"
build_id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
flow: "Flow" = Relationship(back_populates="vertex_builds")


class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildTable]]
@classmethod
def from_list_of_dicts(cls, vertex_build_dicts: list[VertexBuildTable]):
vertex_build_map = {}
for vertex_build in vertex_build_dicts:
if vertex_build.id not in vertex_build_map:
vertex_build_map[vertex_build.id] = []
vertex_build_map[vertex_build.id].append(vertex_build)
return cls(vertex_builds=vertex_build_map)
2 changes: 1 addition & 1 deletion src/backend/base/langflow/services/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def create_db_and_tables(self):

inspector = inspect(self.engine)
table_names = inspector.get_table_names()
current_tables = ["flow", "user", "apikey", "folder", "message", "variable", "transaction"]
current_tables = ["flow", "user", "apikey", "folder", "message", "variable", "transaction", "vertex_build"]

if table_names and all(table in table_names for table in current_tables):
logger.debug("Database and tables already exist")
Expand Down
6 changes: 0 additions & 6 deletions src/backend/base/langflow/services/monitor/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,6 @@ def serialize_dict(v):
return v


def to_map(value: dict):
keys = list(value.keys())
values = list(value.values())
return {"key": keys, "value": values}


class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildResponseModel]]

Expand Down
Loading

0 comments on commit d3985f3

Please sign in to comment.