Skip to content

Commit

Permalink
feat: join vertex UI support (#1010)
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <guptavedant2312@gmail.com>
  • Loading branch information
veds-g committed Sep 1, 2023
1 parent c3d8498 commit 247b89e
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 16 deletions.
4 changes: 2 additions & 2 deletions ui/src/components/pages/Pipeline/partials/Graph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import { GraphProps } from "../../../../../types/declarations/graph";
import "reactflow/dist/style.css";
import "./style.css";

const nodeWidth = 172;
const nodeHeight = 36;
const nodeWidth = 230;
const nodeHeight = 48;
const graphDirection = "LR";

const defaultNodeTypes: NodeTypes = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const CustomEdge: FC<EdgeProps> = ({
sourcePosition,
targetPosition,
data,
markerEnd,
}) => {
const obj = getSmoothStepPath({
sourceX,
Expand All @@ -25,8 +26,53 @@ const CustomEdge: FC<EdgeProps> = ({
targetPosition,
});

const [edgePath, labelX] = obj;
let [, , labelY] = obj;
const [, labelX] = obj;
let [edgePath, , labelY] = obj;
let labelRenderer = "";

if (data?.fwdEdge) {
if (sourceY !== targetY) {
if (data?.fromNodeOutDegree > 1) {
if (sourceY < targetY) {
labelY = targetY - 14;
} else {
labelY = targetY + 2;
}
} else {
if (sourceY < targetY) {
labelY = targetY - 47;
} else {
labelY = targetY + 36;
}
}
}
labelRenderer = `translate(-50%, -50%) translate(${labelX}px,${labelY}px)`;
}

if (data?.backEdge) {
const height = data?.backEdgeHeight * 35;
edgePath = `M ${sourceX} ${sourceY} L ${sourceX} ${
-height + 5
} Q ${sourceX} ${-height} ${sourceX - 5} ${-height} L ${
targetX + 5
} ${-height} Q ${targetX} ${-height} ${targetX} ${
-height + 5
} L ${targetX} ${targetY}`;
const centerX = (sourceX + targetX) / 2;
labelRenderer = `translate(-50%, -50%) translate(${centerX}px,${-height}px)`;
}

if (data?.selfEdge) {
edgePath = `M ${sourceX} ${sourceY} L ${sourceX} ${
sourceY - 15
} Q ${sourceX} ${sourceY - 20} ${sourceX - 5} ${sourceY - 20} L ${
targetX + 5
} ${targetY - 20} Q ${targetX} ${sourceY - 20} ${targetX} ${
sourceY - 15
} L ${targetX} ${sourceY}`;
const centerX = (sourceX + targetX) / 2;
labelRenderer = `translate(-50%, -50%) translate(${centerX}px,${labelY}px)`;
}

// Highlight the edge on isFull - thick red line
let edgeStyle, wmStyle, pendingStyle;
Expand All @@ -39,17 +85,12 @@ const CustomEdge: FC<EdgeProps> = ({
};
}

if (sourceY === targetY) {
if (sourceY === targetY || data?.backEdge) {
wmStyle = { marginTop: "-0.87rem" };
pendingStyle = { marginTop: "0.15rem" };
} else {
wmStyle = { right: "0.1rem" };
pendingStyle = { left: "0.1rem" };
if (sourceY < targetY) {
labelY = targetY - 14;
} else {
labelY = targetY + 2;
}
}
const getMinWM = () => {
if (data?.edgeWatermark?.watermarks) {
Expand Down Expand Up @@ -101,13 +142,14 @@ const CustomEdge: FC<EdgeProps> = ({
d={edgePath}
style={edgeStyle}
data-testid={id}
markerEnd={markerEnd}
/>
</svg>
<EdgeLabelRenderer>
<div
className={"edge"}
style={{
transform: `translate(-50%, -50%) translate(${labelX}px,${labelY}px)`,
transform: labelRenderer,
}}
>
{data?.edgeWatermark?.isWaterMarkEnabled && (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import { GetNodeInfoValueComponent } from "./partials/NodeLabelInfo";
import "reactflow/dist/style.css";
import "./style.css";

const getColor = (nodeType) => {
const getColor = (nodeType: string) => {
return nodeType === "source"
? "#34BFFF"
: nodeType === "udf"
? "#82DBE4"
: "#82A9C9";
};

const getBorderColor = (nodeType) => {
const getBorderColor = (nodeType: string) => {
return nodeType === "source"
? "#2382ad"
: nodeType === "udf"
Expand Down Expand Up @@ -89,17 +89,45 @@ const CustomNode: FC<NodeProps> = ({
{(data?.type === "udf" || data?.type === "sink") && (
<Handle
type="target"
id="0"
position={targetPosition}
isConnectable={isConnectable}
/>
)}
{(data?.type === "source" || data?.type === "udf") && (
<Handle
type="source"
id="0"
position={sourcePosition}
isConnectable={isConnectable}
/>
)}
{data?.centerSourceHandle && (
<Handle type="source" id="1" position={Position.Top} />
)}
{data?.centerTargetHandle && (
<Handle type="target" id="1" position={Position.Top} />
)}
{data?.quadHandle && (
<>
<Handle
type="source"
id="2"
position={Position.Top}
style={{
left: "75%",
}}
/>
<Handle
type="target"
id="2"
position={Position.Top}
style={{
left: "25%",
}}
/>
</>
)}
</div>
</Tooltip>
</div>
Expand Down
153 changes: 150 additions & 3 deletions ui/src/utils/fetcherHooks/pipelineViewFetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { useCallback, useEffect, useMemo, useState } from "react";
import { Edge, Node } from "reactflow";
import { Edge, MarkerType, Node } from "reactflow";
import { isEqual } from "lodash";
import {
BufferInfo,
Expand All @@ -25,6 +25,18 @@ export const usePipelineViewFetch = (
const [edgeWatermark, setEdgeWatermark] = useState<
Map<string, EdgeWatermark>
>(new Map());
const [selfEdges, setSelfEdges] = useState<Set<string>>(new Set());
const [backEdges, setBackEdges] = useState<Set<string>>(new Set());
const [fwdEdges, setFwdEdges] = useState<Set<string>>(new Set());
const [selfVertices, setSelfVertices] = useState<Set<string>>(new Set());
const [prevVertices, setPrevVertices] = useState<Set<string>>(new Set());
const [nextVertices, setNextVertices] = useState<Set<string>>(new Set());
const [backEdgesHeight, setBackEdgesHeight] = useState<Map<string, number>>(
new Map()
);
const [nodeOutDegree, setNodeOutDegree] = useState<Map<string, number>>(
new Map()
);
const [pipelineErr, setPipelineErr] = useState<any[] | undefined>(undefined);
const [buffersErr, setBuffersErr] = useState<any[] | undefined>(undefined);
const [podsErr, setPodsErr] = useState<any[] | undefined>(undefined);
Expand Down Expand Up @@ -299,6 +311,93 @@ export const usePipelineViewFetch = (
return () => clearInterval(interval);
}, [getPipelineWatermarks]);

const setPipelineDetails = useCallback(() => {
const sourceVertices: string[] = [];
spec?.vertices?.forEach((vertex) => {
if (vertex?.source) {
sourceVertices.push(vertex?.name);
}
});

// directed graph is represented as an adjacency list
const adjacencyList: { [key: string]: string[] } = {};
if (spec?.vertices && spec?.edges) {
spec?.edges?.forEach((edge) => {
if (!adjacencyList[edge?.from]) {
adjacencyList[edge?.from] = [];
}

adjacencyList[edge?.from].push(edge?.to);
});

const selfEdges: Set<string> = new Set();
const backEdges: Set<string> = new Set();
const forwardEdges: Set<string> = new Set();
const selfVertices: Set<string> = new Set();
const prevVertices: Set<string> = new Set();
const nextVertices: Set<string> = new Set();
const backEdgesHeight = new Map();

const visited: Set<string> = new Set();
const recStack: Set<string> = new Set();
let height = 1;
const dfs = (node: string) => {
visited.add(node);
recStack.add(node);
adjacencyList[node]?.forEach((child: string) => {
const id = `${node}-${child}`;
if (node === child) {
selfEdges.add(id);
selfVertices.add(node);
return;
}
if (recStack.has(child)) {
backEdges.add(id);
backEdgesHeight.set(id, height);
height++;
nextVertices.add(node);
prevVertices.add(child);
return;
}
if (!recStack.has(child)) {
forwardEdges.add(id);
if (!visited.has(child)) dfs(child);
}
});
recStack.delete(node);
};
sourceVertices?.forEach((vertex: any) => {
if (!visited.has(vertex)) {
dfs(vertex);
}
});

const nodeOutDegree: Map<string, number> = new Map();
spec?.edges.forEach((edge) => {
if (forwardEdges.has(`${edge?.from}-${edge?.to}`)) {
nodeOutDegree.set(
edge?.from,
(nodeOutDegree.get(edge?.from) || 0) + 1
);
}
});

setSelfEdges(selfEdges);
setBackEdges(backEdges);
setFwdEdges(forwardEdges);
setSelfVertices(selfVertices);
setPrevVertices(prevVertices);
setNextVertices(nextVertices);
setBackEdgesHeight(backEdgesHeight);
setNodeOutDegree(nodeOutDegree);
}
}, [spec]);

// This useEffect is used to update edges and vertices types
useEffect(() => {
setPipelineDetails();
}, [setPipelineDetails]);

const vertices = useMemo(() => {
const newVertices: Node[] = [];
if (spec?.vertices && buffers && vertexPods && vertexMetrics) {
Expand Down Expand Up @@ -335,11 +434,24 @@ export const usePipelineViewFetch = (
}
});
if (newNode.data.buffers.length === 0) newNode.data.buffers = null;
// added handles(connector points) for self loops and cycles
newNode.data.centerSourceHandle = nextVertices.has(newNode.id);
newNode.data.centerTargetHandle = prevVertices.has(newNode.id);
newNode.data.quadHandle = selfVertices.has(newNode.id);
newVertices.push(newNode);
});
}
return newVertices;
}, [spec, buffers, vertexPods, vertexMetrics, ns_pl]);
}, [
spec,
buffers,
vertexPods,
vertexMetrics,
ns_pl,
prevVertices,
selfVertices,
nextVertices,
]);

const edges = useMemo(() => {
const newEdges: Edge[] = [];
Expand Down Expand Up @@ -369,6 +481,12 @@ export const usePipelineViewFetch = (

spec.edges.forEach((edge: any) => {
const id = edge?.from + "-" + edge?.to;
const markerEnd = {
type: MarkerType.Arrow,
width: 15,
height: 15,
color: "black",
};
const pipelineEdge = {
id,
source: edge?.from,
Expand All @@ -377,18 +495,47 @@ export const usePipelineViewFetch = (
conditions: edge?.conditions,
backpressureLabel: edgeBackpressureLabel.get(edge?.to),
isFull: edgeIsFull.get(edge?.to),
source: edge?.from,
target: edge?.to,
fwdEdge: fwdEdges.has(id),
backEdge: backEdges.has(id),
selfEdge: selfEdges.has(id),
backEdgeHeight: backEdgesHeight.get(id) || 0,
fromNodeOutDegree: nodeOutDegree.get(edge?.from) || 0,
},
} as Edge;
pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id)
? edgeWatermark.get(pipelineEdge.id)
: null;
pipelineEdge.animated = true;
pipelineEdge.type = "custom";
if (backEdges.has(id)) {
pipelineEdge.sourceHandle = "1";
pipelineEdge.targetHandle = "1";
pipelineEdge.markerEnd = markerEnd;
} else if (selfEdges.has(id)) {
pipelineEdge.sourceHandle = "2";
pipelineEdge.targetHandle = "2";
pipelineEdge.markerEnd = markerEnd;
} else if (fwdEdges.has(id)) {
pipelineEdge.sourceHandle = "0";
pipelineEdge.targetHandle = "0";
}
newEdges.push(pipelineEdge);
});
}
return newEdges;
}, [spec, buffers, edgeWatermark, ns_pl]);
}, [
spec,
buffers,
edgeWatermark,
ns_pl,
backEdges,
selfEdges,
fwdEdges,
backEdgesHeight,
nodeOutDegree,
]);

//sets loading variable
useEffect(() => {
Expand Down

0 comments on commit 247b89e

Please sign in to comment.