Skip to content

Commit

Permalink
feat: add processing rate to UI (#121)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
Krithika3 authored and whynowy committed Aug 6, 2022
1 parent ffd38a1 commit 0dab55d
Show file tree
Hide file tree
Showing 8 changed files with 577 additions and 309 deletions.
82 changes: 49 additions & 33 deletions ui/src/components/pipeline/Pipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ConnectionLineType, Edge, Node } from "react-flow-renderer";
import Graph from "./graph/Graph";
import { usePipelineFetch } from "../../utils/fetchWrappers/pipelineFetch";
import { useEdgesInfoFetch } from "../../utils/fetchWrappers/edgeInfoFetch";

import { ProcessingRates } from "../../utils/models/pipeline";
import "./Pipeline.css";

export function Pipeline() {
Expand All @@ -14,19 +14,24 @@ export function Pipeline() {

const [vertexPods, setVertexPods] = useState<Map<string, number>>(null);

const {
pipeline,
error: pipelineError,
} = usePipelineFetch(namespaceId, pipelineId, pipelineRequestKey);
const [vertexRate, setVertexRate] =
useState<Map<string, ProcessingRates>>(null);

const { pipeline, error: pipelineError } = usePipelineFetch(
namespaceId,
pipelineId,
pipelineRequestKey
);

const [edgesInfoRequestKey, setEdgesInfoRequestKey] = useState(
`${Date.now()}`
);

const {
edgesInfo,
error: edgesInfoError,
} = useEdgesInfoFetch(namespaceId, pipelineId, edgesInfoRequestKey);
const { edgesInfo, error: edgesInfoError } = useEdgesInfoFetch(
namespaceId,
pipelineId,
edgesInfoRequestKey
);

useEffect(() => {
// Refresh pipeline info every x ms
Expand Down Expand Up @@ -66,9 +71,35 @@ export function Pipeline() {
}
}, [pipeline]);

// This useEffect is used to obtain all the rates for a given vertex in a pipeline.
useEffect(() => {
const vertexToRateMap = new Map();

if (pipeline?.spec?.vertices) {
Promise.all(
pipeline?.spec?.vertices.map((vertex) => {
return fetch(
`/api/v1/namespaces/${namespaceId}/pipelines/${pipelineId}/vertices/${vertex.name}/metrics`
)
.then((response) => response.json())
.then((json) => {
const processinRates = {} as ProcessingRates;
processinRates.ratePerMin =
json["processingRates"]["1m"].toFixed(2);
processinRates.ratePerFiveMin =
json["processingRates"]["5m"].toFixed(2);
processinRates.ratePerFifteenMin =
json["processingRates"]["15m"].toFixed(2);
vertexToRateMap.set(vertex.name, processinRates);
});
})
).then(() => setVertexRate(vertexToRateMap));
}
}, [pipeline]);

const vertices = useMemo(() => {
const newVertices: Node[] = [];
if (pipeline?.spec?.vertices && vertexPods) {
if (pipeline?.spec?.vertices && vertexPods && vertexRate) {
pipeline.spec.vertices.map((vertex) => {
const podsLength = vertexPods.has(vertex.name)
? vertexPods.get(vertex.name)
Expand All @@ -86,39 +117,24 @@ export function Pipeline() {
// change this in the future if you would like to make it draggable
newNode.draggable = false;
if (vertex.source) {
newNode.type = "input";
newNode.style = {
background: "#d5dee6",
boxShadow: "1",
color: "#333",
border: "1px solid #b8cee2",
};
newNode.type = "source";
newNode.data.source = vertex;
} else if (vertex.sink) {
newNode.type = "output";
newNode.style = {
background: "#c3bbb7",
color: "#333",
border: "1px solid #a9a3a0",
};
newNode.type = "sink";
newNode.data.sink = vertex;
newNode.data.test = vertex.name;
} else {
newNode.data.udf = vertex;
newNode.style = {
background: "#efdbce",
color: "#333",
border: "1px solid #f1c5a8",
};
newNode.type = "udf";
}
newNode.style.cursor = "pointer";
newNode.style.fontFamily = "IBM Plex Sans";
newNode.style.fontWeight = 400;
newNode.style.fontSize = "0.50rem";
newNode.data.rate = vertexRate.has(vertex.name)
? vertexRate.get(vertex.name)
: 0;
newVertices.push(newNode);
});
}
return newVertices;
}, [pipeline, vertexPods]);
}, [pipeline, vertexPods, vertexRate]);

const edges = useMemo(() => {
const newEdges: Edge[] = [];
Expand Down

0 comments on commit 0dab55d

Please sign in to comment.