diff --git a/ui/src/components/pages/Pipeline/partials/Graph/index.tsx b/ui/src/components/pages/Pipeline/partials/Graph/index.tsx index 88d84ca2b..08a2b3ea1 100644 --- a/ui/src/components/pages/Pipeline/partials/Graph/index.tsx +++ b/ui/src/components/pages/Pipeline/partials/Graph/index.tsx @@ -26,8 +26,8 @@ import { GraphProps } from "../../../../../types/declarations/graph"; import "reactflow/dist/style.css"; import "./style.css"; -const nodeWidth = 172; -const nodeHeight = 36; +const nodeWidth = 230; +const nodeHeight = 48; const graphDirection = "LR"; const defaultNodeTypes: NodeTypes = { diff --git a/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomEdge/index.tsx b/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomEdge/index.tsx index 461f1b557..99b736655 100644 --- a/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomEdge/index.tsx +++ b/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomEdge/index.tsx @@ -15,6 +15,7 @@ const CustomEdge: FC = ({ sourcePosition, targetPosition, data, + markerEnd, }) => { const obj = getSmoothStepPath({ sourceX, @@ -25,8 +26,53 @@ const CustomEdge: FC = ({ targetPosition, }); - const [edgePath, labelX] = obj; - let [, , labelY] = obj; + const [, labelX] = obj; + let [edgePath, , labelY] = obj; + let labelRenderer = ""; + + if (data?.fwdEdge) { + if (sourceY !== targetY) { + if (data?.fromNodeOutDegree > 1) { + if (sourceY < targetY) { + labelY = targetY - 14; + } else { + labelY = targetY + 2; + } + } else { + if (sourceY < targetY) { + labelY = targetY - 47; + } else { + labelY = targetY + 36; + } + } + } + labelRenderer = `translate(-50%, -50%) translate(${labelX}px,${labelY}px)`; + } + + if (data?.backEdge) { + const height = data?.backEdgeHeight * 35; + edgePath = `M ${sourceX} ${sourceY} L ${sourceX} ${ + -height + 5 + } Q ${sourceX} ${-height} ${sourceX - 5} ${-height} L ${ + targetX + 5 + } ${-height} Q ${targetX} ${-height} ${targetX} ${ + -height + 5 + } L ${targetX} ${targetY}`; + const centerX = (sourceX + targetX) / 2; + labelRenderer = `translate(-50%, -50%) translate(${centerX}px,${-height}px)`; + } + + if (data?.selfEdge) { + edgePath = `M ${sourceX} ${sourceY} L ${sourceX} ${ + sourceY - 15 + } Q ${sourceX} ${sourceY - 20} ${sourceX - 5} ${sourceY - 20} L ${ + targetX + 5 + } ${targetY - 20} Q ${targetX} ${sourceY - 20} ${targetX} ${ + sourceY - 15 + } L ${targetX} ${sourceY}`; + const centerX = (sourceX + targetX) / 2; + labelRenderer = `translate(-50%, -50%) translate(${centerX}px,${labelY}px)`; + } // Highlight the edge on isFull - thick red line let edgeStyle, wmStyle, pendingStyle; @@ -39,17 +85,12 @@ const CustomEdge: FC = ({ }; } - if (sourceY === targetY) { + if (sourceY === targetY || data?.backEdge) { wmStyle = { marginTop: "-0.87rem" }; pendingStyle = { marginTop: "0.15rem" }; } else { wmStyle = { right: "0.1rem" }; pendingStyle = { left: "0.1rem" }; - if (sourceY < targetY) { - labelY = targetY - 14; - } else { - labelY = targetY + 2; - } } const getMinWM = () => { if (data?.edgeWatermark?.watermarks) { @@ -101,13 +142,14 @@ const CustomEdge: FC = ({ d={edgePath} style={edgeStyle} data-testid={id} + markerEnd={markerEnd} />
{data?.edgeWatermark?.isWaterMarkEnabled && ( diff --git a/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomNode/index.tsx b/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomNode/index.tsx index 5a64be910..1446d1ae0 100644 --- a/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomNode/index.tsx +++ b/ui/src/components/pages/Pipeline/partials/Graph/partials/CustomNode/index.tsx @@ -6,7 +6,7 @@ import { GetNodeInfoValueComponent } from "./partials/NodeLabelInfo"; import "reactflow/dist/style.css"; import "./style.css"; -const getColor = (nodeType) => { +const getColor = (nodeType: string) => { return nodeType === "source" ? "#34BFFF" : nodeType === "udf" @@ -14,7 +14,7 @@ const getColor = (nodeType) => { : "#82A9C9"; }; -const getBorderColor = (nodeType) => { +const getBorderColor = (nodeType: string) => { return nodeType === "source" ? "#2382ad" : nodeType === "udf" @@ -89,6 +89,7 @@ const CustomNode: FC = ({ {(data?.type === "udf" || data?.type === "sink") && ( @@ -96,10 +97,37 @@ const CustomNode: FC = ({ {(data?.type === "source" || data?.type === "udf") && ( )} + {data?.centerSourceHandle && ( + + )} + {data?.centerTargetHandle && ( + + )} + {data?.quadHandle && ( + <> + + + + )}
diff --git a/ui/src/utils/fetcherHooks/pipelineViewFetch.ts b/ui/src/utils/fetcherHooks/pipelineViewFetch.ts index b0192dd5d..ebead9cce 100644 --- a/ui/src/utils/fetcherHooks/pipelineViewFetch.ts +++ b/ui/src/utils/fetcherHooks/pipelineViewFetch.ts @@ -1,5 +1,5 @@ import { useCallback, useEffect, useMemo, useState } from "react"; -import { Edge, Node } from "reactflow"; +import { Edge, MarkerType, Node } from "reactflow"; import { isEqual } from "lodash"; import { BufferInfo, @@ -25,6 +25,18 @@ export const usePipelineViewFetch = ( const [edgeWatermark, setEdgeWatermark] = useState< Map >(new Map()); + const [selfEdges, setSelfEdges] = useState>(new Set()); + const [backEdges, setBackEdges] = useState>(new Set()); + const [fwdEdges, setFwdEdges] = useState>(new Set()); + const [selfVertices, setSelfVertices] = useState>(new Set()); + const [prevVertices, setPrevVertices] = useState>(new Set()); + const [nextVertices, setNextVertices] = useState>(new Set()); + const [backEdgesHeight, setBackEdgesHeight] = useState>( + new Map() + ); + const [nodeOutDegree, setNodeOutDegree] = useState>( + new Map() + ); const [pipelineErr, setPipelineErr] = useState(undefined); const [buffersErr, setBuffersErr] = useState(undefined); const [podsErr, setPodsErr] = useState(undefined); @@ -299,6 +311,93 @@ export const usePipelineViewFetch = ( return () => clearInterval(interval); }, [getPipelineWatermarks]); + const setPipelineDetails = useCallback(() => { + const sourceVertices: string[] = []; + spec?.vertices?.forEach((vertex) => { + if (vertex?.source) { + sourceVertices.push(vertex?.name); + } + }); + + // directed graph is represented as an adjacency list + const adjacencyList: { [key: string]: string[] } = {}; + if (spec?.vertices && spec?.edges) { + spec?.edges?.forEach((edge) => { + if (!adjacencyList[edge?.from]) { + adjacencyList[edge?.from] = []; + } + + adjacencyList[edge?.from].push(edge?.to); + }); + + const selfEdges: Set = new Set(); + const backEdges: Set = new Set(); + const forwardEdges: Set = new Set(); + const selfVertices: Set = new Set(); + const prevVertices: Set = new Set(); + const nextVertices: Set = new Set(); + const backEdgesHeight = new Map(); + + const visited: Set = new Set(); + const recStack: Set = new Set(); + let height = 1; + const dfs = (node: string) => { + visited.add(node); + recStack.add(node); + adjacencyList[node]?.forEach((child: string) => { + const id = `${node}-${child}`; + if (node === child) { + selfEdges.add(id); + selfVertices.add(node); + return; + } + if (recStack.has(child)) { + backEdges.add(id); + backEdgesHeight.set(id, height); + height++; + nextVertices.add(node); + prevVertices.add(child); + return; + } + if (!recStack.has(child)) { + forwardEdges.add(id); + if (!visited.has(child)) dfs(child); + } + }); + recStack.delete(node); + }; + sourceVertices?.forEach((vertex: any) => { + if (!visited.has(vertex)) { + dfs(vertex); + } + }); + + const nodeOutDegree: Map = new Map(); + spec?.edges.forEach((edge) => { + if (forwardEdges.has(`${edge?.from}-${edge?.to}`)) { + nodeOutDegree.set( + edge?.from, + (nodeOutDegree.get(edge?.from) || 0) + 1 + ); + } + }); + + setSelfEdges(selfEdges); + setBackEdges(backEdges); + setFwdEdges(forwardEdges); + setSelfVertices(selfVertices); + setPrevVertices(prevVertices); + setNextVertices(nextVertices); + setBackEdgesHeight(backEdgesHeight); + setNodeOutDegree(nodeOutDegree); + } + }, [spec]); + + // This useEffect is used to update edges and vertices types + useEffect(() => { + setPipelineDetails(); + }, [setPipelineDetails]); + const vertices = useMemo(() => { const newVertices: Node[] = []; if (spec?.vertices && buffers && vertexPods && vertexMetrics) { @@ -335,11 +434,24 @@ export const usePipelineViewFetch = ( } }); if (newNode.data.buffers.length === 0) newNode.data.buffers = null; + // added handles(connector points) for self loops and cycles + newNode.data.centerSourceHandle = nextVertices.has(newNode.id); + newNode.data.centerTargetHandle = prevVertices.has(newNode.id); + newNode.data.quadHandle = selfVertices.has(newNode.id); newVertices.push(newNode); }); } return newVertices; - }, [spec, buffers, vertexPods, vertexMetrics, ns_pl]); + }, [ + spec, + buffers, + vertexPods, + vertexMetrics, + ns_pl, + prevVertices, + selfVertices, + nextVertices, + ]); const edges = useMemo(() => { const newEdges: Edge[] = []; @@ -369,6 +481,12 @@ export const usePipelineViewFetch = ( spec.edges.forEach((edge: any) => { const id = edge?.from + "-" + edge?.to; + const markerEnd = { + type: MarkerType.Arrow, + width: 15, + height: 15, + color: "black", + }; const pipelineEdge = { id, source: edge?.from, @@ -377,6 +495,13 @@ export const usePipelineViewFetch = ( conditions: edge?.conditions, backpressureLabel: edgeBackpressureLabel.get(edge?.to), isFull: edgeIsFull.get(edge?.to), + source: edge?.from, + target: edge?.to, + fwdEdge: fwdEdges.has(id), + backEdge: backEdges.has(id), + selfEdge: selfEdges.has(id), + backEdgeHeight: backEdgesHeight.get(id) || 0, + fromNodeOutDegree: nodeOutDegree.get(edge?.from) || 0, }, } as Edge; pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id) @@ -384,11 +509,33 @@ export const usePipelineViewFetch = ( : null; pipelineEdge.animated = true; pipelineEdge.type = "custom"; + if (backEdges.has(id)) { + pipelineEdge.sourceHandle = "1"; + pipelineEdge.targetHandle = "1"; + pipelineEdge.markerEnd = markerEnd; + } else if (selfEdges.has(id)) { + pipelineEdge.sourceHandle = "2"; + pipelineEdge.targetHandle = "2"; + pipelineEdge.markerEnd = markerEnd; + } else if (fwdEdges.has(id)) { + pipelineEdge.sourceHandle = "0"; + pipelineEdge.targetHandle = "0"; + } newEdges.push(pipelineEdge); }); } return newEdges; - }, [spec, buffers, edgeWatermark, ns_pl]); + }, [ + spec, + buffers, + edgeWatermark, + ns_pl, + backEdges, + selfEdges, + fwdEdges, + backEdgesHeight, + nodeOutDegree, + ]); //sets loading variable useEffect(() => {