From 67277b794766dadc1a77bc77baa70277fdacf07c Mon Sep 17 00:00:00 2001 From: Vedant Gupta <49195734+veds-g@users.noreply.github.com> Date: Tue, 30 May 2023 12:18:30 +0530 Subject: [PATCH] fix: pipeline view fix (#755) Signed-off-by: veds-g --- .../server/service/pipeline_metrics_query.go | 12 +-- .../service/pipeline_metrics_query_test.go | 5 +- .../service/pipeline_watermark_query.go | 6 +- pkg/isbsvc/interface.go | 2 +- pkg/isbsvc/jetstream_service.go | 34 ++++--- pkg/isbsvc/redis_service.go | 20 ++-- server/apis/interface.go | 4 +- server/apis/v1/handler.go | 10 +- server/routes/routes.go | 4 +- ui/src/components/pipeline/Pipeline.test.tsx | 14 +-- ui/src/components/pipeline/Pipeline.tsx | 93 ++++++++++--------- .../pipeline/edgeinfo/EdgeInfo.test.tsx | 4 - .../components/pipeline/edgeinfo/EdgeInfo.tsx | 90 ++---------------- .../components/pipeline/nodeinfo/NodeInfo.tsx | 69 ++++++++++++++ ...oFetch.test.ts => bufferInfoFetch.test.ts} | 12 +-- .../{edgeInfoFetch.ts => bufferInfoFetch.ts} | 12 +-- ui/src/utils/models/pipeline.ts | 5 +- 17 files changed, 196 insertions(+), 200 deletions(-) rename ui/src/utils/fetchWrappers/{edgeInfoFetch.test.ts => bufferInfoFetch.test.ts} (76%) rename ui/src/utils/fetchWrappers/{edgeInfoFetch.ts => bufferInfoFetch.ts} (63%) diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index 181c4048b..c05a9f547 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -173,15 +173,9 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem Name: vertexName, }, } - podNum := int64(1) - // for now only reduce has parallelism might have to modify later - // checking parallelism for a vertex to identify reduce vertex - // replicas will have parallelism for reduce vertex else will be nil - // parallelism indicates replica count ~ multiple pods for a vertex here - obj := ps.pipeline.GetFromEdges(req.GetVertex()) - if len(obj) > 0 && obj[0].DeprecatedParallelism != nil { - podNum = int64(*obj[0].DeprecatedParallelism) - } + // partitions indicates replica count ~ multiple pods for a vertex here + obj := ps.pipeline.NumOfPartitions(req.GetVertex()) + podNum := int64(obj) var vertexLevelRates map[string]float64 diff --git a/pkg/daemon/server/service/pipeline_metrics_query_test.go b/pkg/daemon/server/service/pipeline_metrics_query_test.go index 5aa593468..fcfaca39d 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query_test.go +++ b/pkg/daemon/server/service/pipeline_metrics_query_test.go @@ -68,14 +68,17 @@ func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffe return nil } -func (ms *mockIsbSvcClient) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) { +func (ms *mockIsbSvcClient) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) { return nil, nil } func TestGetVertexMetrics(t *testing.T) { pipelineName := "simple-pipeline" + vertexName := "cat" + vertexPartition := int32(1) pipeline := &v1alpha1.Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: pipelineName}, + Spec: v1alpha1.PipelineSpec{Vertices: []v1alpha1.AbstractVertex{{Name: vertexName, Partitions: &vertexPartition}}}, } client, _ := isbsvc.NewISBJetStreamSvc(pipelineName) pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline, nil, nil, false) diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go index c10af1373..db9a76d12 100644 --- a/pkg/daemon/server/service/pipeline_watermark_query.go +++ b/pkg/daemon/server/service/pipeline_watermark_query.go @@ -38,13 +38,13 @@ func GetEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline, } for _, edge := range pipeline.ListAllEdges() { - var wmFetcherList []fetch.Fetcher bucketName := v1alpha1.GenerateEdgeBucketName(pipeline.Namespace, pipeline.Name, edge.From, edge.To) - fetchWatermark, err := isbSvcClient.CreateWatermarkFetcher(ctx, bucketName) + partitions := pipeline.NumOfPartitions(edge.To) + isReduce := pipeline.GetVertex(edge.To).IsReduceUDF() + wmFetcherList, err := isbSvcClient.CreateWatermarkFetcher(ctx, bucketName, partitions, isReduce) if err != nil { return nil, fmt.Errorf("failed to create watermark fetcher %w", err) } - wmFetcherList = append(wmFetcherList, fetchWatermark) wmFetchers[edge.From+"-"+edge.To] = wmFetcherList } return wmFetchers, nil diff --git a/pkg/isbsvc/interface.go b/pkg/isbsvc/interface.go index 3726b98c7..6ea26f182 100644 --- a/pkg/isbsvc/interface.go +++ b/pkg/isbsvc/interface.go @@ -28,7 +28,7 @@ type ISBService interface { DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string) error ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string) error GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error) - CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) + CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) } // createOptions describes the options for creating buffers and buckets diff --git a/pkg/isbsvc/jetstream_service.go b/pkg/isbsvc/jetstream_service.go index d75f3ee9a..5b03b2672 100644 --- a/pkg/isbsvc/jetstream_service.go +++ b/pkg/isbsvc/jetstream_service.go @@ -279,21 +279,27 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf return bufferInfo, nil } -func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) { - hbBucketName := JetStreamProcessorBucket(bucketName) - hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jss.jsClient) - if err != nil { - return nil, err - } - otBucketName := JetStreamOTBucket(bucketName) - otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otBucketName, jss.jsClient) - if err != nil { - return nil, err +// TODO: revisit this when working on multi partitions +func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) { + var watermarkFetchers []fetch.Fetcher + for i := 0; i < partitions; i++ { + hbBucketName := JetStreamProcessorBucket(bucketName) + hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jss.jsClient) + // should we return error if one is successful? + if err != nil { + return nil, err + } + otBucketName := JetStreamOTBucket(bucketName) + otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otBucketName, jss.jsClient) + if err != nil { + return nil, err + } + storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) + pm := processor.NewProcessorManager(ctx, storeWatcher, processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm) + watermarkFetchers = append(watermarkFetchers, watermarkFetcher) } - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) - pm := processor.NewProcessorManager(ctx, storeWatcher) - watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm) - return watermarkFetcher, nil + return watermarkFetchers, nil } func JetStreamName(bufferName string) string { diff --git a/pkg/isbsvc/redis_service.go b/pkg/isbsvc/redis_service.go index 6d5c7b178..c5836c0d0 100644 --- a/pkg/isbsvc/redis_service.go +++ b/pkg/isbsvc/redis_service.go @@ -139,12 +139,18 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe return bufferInfo, nil } -func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) { +// TODO: revisit this when working on multi partitions +func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) { // Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher. - hbWatcher := noop.NewKVOpWatch() - otWatcher := noop.NewKVOpWatch() - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - pm := processor.NewProcessorManager(ctx, storeWatcher) - watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm) - return watermarkFetcher, nil + var watermarkFetchers []fetch.Fetcher + for i := 0; i < partitions; i++ { + hbWatcher := noop.NewKVOpWatch() + otWatcher := noop.NewKVOpWatch() + storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + pm := processor.NewProcessorManager(ctx, storeWatcher, processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm) + watermarkFetchers = append(watermarkFetchers, watermarkFetcher) + } + + return watermarkFetchers, nil } diff --git a/server/apis/interface.go b/server/apis/interface.go index 7158bf2ae..da54d8635 100644 --- a/server/apis/interface.go +++ b/server/apis/interface.go @@ -30,8 +30,8 @@ type Handler interface { PodLogs(c *gin.Context) ListPodsMetrics(c *gin.Context) GetPodMetrics(c *gin.Context) - ListPipelineEdges(c *gin.Context) - GetPipelineEdge(c *gin.Context) + ListPipelineBuffers(c *gin.Context) + GetVertexBuffers(c *gin.Context) GetPipelineWatermarks(c *gin.Context) GetPipelineStatus(c *gin.Context) ListNamespaces(c *gin.Context) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index cc5c7cc65..198c66a91 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -220,8 +220,8 @@ func (h *handler) PodLogs(c *gin.Context) { } } -// ListPipelineEdges is used to provide information about all the pipeline edges -func (h *handler) ListPipelineEdges(c *gin.Context) { +// ListPipelineBuffers is used to provide buffer information about all the pipeline vertices +func (h *handler) ListPipelineBuffers(c *gin.Context) { ns := c.Param("namespace") pipeline := c.Param("pipeline") client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) @@ -240,8 +240,8 @@ func (h *handler) ListPipelineEdges(c *gin.Context) { c.JSON(http.StatusOK, l) } -// GetPipelineEdge is used to provide information about a single pipeline edge -func (h *handler) GetPipelineEdge(c *gin.Context) { +// GetVertexBuffers is used to provide buffer information about a single pipeline vertex +func (h *handler) GetVertexBuffers(c *gin.Context) { ns := c.Param("namespace") pipeline := c.Param("pipeline") client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) @@ -253,7 +253,7 @@ func (h *handler) GetPipelineEdge(c *gin.Context) { _ = client.Close() }() // Assume edge is the buffer name - i, err := client.GetPipelineBuffer(context.Background(), pipeline, c.Param("edge")) + i, err := client.GetPipelineBuffer(context.Background(), pipeline, c.Param("vertex")) if err != nil { c.JSON(http.StatusInternalServerError, err.Error()) return diff --git a/server/routes/routes.go b/server/routes/routes.go index 31120c8ac..a0cbebcc7 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -55,8 +55,8 @@ func v1Routes(r gin.IRouter) { r.GET("/namespaces/:namespace/pods/:pod/log", handler.PodLogs) r.GET("/metrics/namespaces/:namespace/pods", handler.ListPodsMetrics) r.GET("/metrics/namespaces/:namespace/pods/:pod", handler.GetPodMetrics) - r.GET("/namespaces/:namespace/pipelines/:pipeline/edges", handler.ListPipelineEdges) - r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge) + r.GET("/namespaces/:namespace/pipelines/:pipeline/buffers", handler.ListPipelineBuffers) + r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/buffers", handler.GetVertexBuffers) r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics) r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks) r.GET("/namespaces/:namespace/pipelines/:pipeline/status", handler.GetPipelineStatus) diff --git a/ui/src/components/pipeline/Pipeline.test.tsx b/ui/src/components/pipeline/Pipeline.test.tsx index aacff1065..57ec0f0cd 100644 --- a/ui/src/components/pipeline/Pipeline.test.tsx +++ b/ui/src/components/pipeline/Pipeline.test.tsx @@ -1,7 +1,7 @@ import {Pipeline} from "./Pipeline" import {render, screen, waitFor} from "@testing-library/react" import {usePipelineFetch} from "../../utils/fetchWrappers/pipelineFetch"; -import {useEdgesInfoFetch} from "../../utils/fetchWrappers/edgeInfoFetch"; +import {useBuffersInfoFetch} from "../../utils/fetchWrappers/bufferInfoFetch"; global.ResizeObserver = require('resize-observer-polyfill') @@ -16,8 +16,8 @@ jest.mock("react-router-dom", () => ({ jest.mock("../../utils/fetchWrappers/pipelineFetch"); const mockedUsePipelineFetch = usePipelineFetch as jest.MockedFunction; -jest.mock("../../utils/fetchWrappers/edgeInfoFetch"); -const mockedUseEdgesInfoFetch = useEdgesInfoFetch as jest.MockedFunction; +jest.mock("../../utils/fetchWrappers/bufferInfoFetch"); +const mockedUseBuffersInfoFetch = useBuffersInfoFetch as jest.MockedFunction; describe("Pipeline", () => { it("Load Graph screen", async () => { @@ -107,8 +107,8 @@ describe("Pipeline", () => { } }, error: false, loading: false }); - mockedUseEdgesInfoFetch.mockReturnValue({ - edgesInfo: [{ + mockedUseBuffersInfoFetch.mockReturnValue({ + buffersInfo: [{ "fromVertex": "input", "toVertex": "preproc", "pendingCount": 8133, @@ -262,8 +262,8 @@ describe("Pipeline", () => { } }, error: "error", loading: false }); - mockedUseEdgesInfoFetch.mockReturnValue({ - edgesInfo: [{ + mockedUseBuffersInfoFetch.mockReturnValue({ + buffersInfo: [{ "fromVertex": "input", "toVertex": "preproc", "pendingCount": 8133, diff --git a/ui/src/components/pipeline/Pipeline.tsx b/ui/src/components/pipeline/Pipeline.tsx index 594c8d014..5fb15b6ec 100644 --- a/ui/src/components/pipeline/Pipeline.tsx +++ b/ui/src/components/pipeline/Pipeline.tsx @@ -3,7 +3,7 @@ import { useParams } from "react-router-dom"; import { Edge, Node } from "reactflow"; import Graph from "./graph/Graph"; import { usePipelineFetch } from "../../utils/fetchWrappers/pipelineFetch"; -import { useEdgesInfoFetch } from "../../utils/fetchWrappers/edgeInfoFetch"; +import { useBuffersInfoFetch } from "../../utils/fetchWrappers/bufferInfoFetch"; import { VertexMetrics, EdgeWatermark } from "../../utils/models/pipeline"; import "./Pipeline.css"; import { notifyError } from "../../utils/error"; @@ -47,8 +47,8 @@ export function Pipeline() { useEffect(() => { if (pipelineError){ notifyError([{ - error: "Failed to fetch the pipeline vertices", - options: {toastId: "pl-vertex", autoClose: false} + error: "Failed to fetch the pipeline details", + options: {toastId: "pl-details", autoClose: false} }]); } }, [pipelineError]); @@ -63,7 +63,7 @@ export function Pipeline() { }; }, []); - const { edgesInfo, error: edgesInfoError } = useEdgesInfoFetch( + const { buffersInfo, error: buffersInfoError } = useBuffersInfoFetch( namespaceId, pipelineId, edgesInfoRequestKey @@ -71,13 +71,13 @@ export function Pipeline() { // This useEffect notifies about the errors while querying for the edges of the pipeline useEffect(() => { - if (edgesInfoError) { + if (buffersInfoError) { notifyError([{ - error: "Failed to fetch the pipeline edges", - options: {toastId: "pl-edge", autoClose: false} + error: "Failed to fetch the pipeline buffers", + options: {toastId: "pl-buffer", autoClose: false} }]); } - }, [edgesInfoError]); + }, [buffersInfoError]); useEffect(() => { // Refresh edgesInfo every x ms @@ -283,7 +283,8 @@ export function Pipeline() { if ( pipeline?.spec?.vertices && vertexPods && - vertexMetrics + vertexMetrics && + buffersInfo ) { pipeline.spec.vertices.map((vertex) => { const newNode = {} as Node; @@ -307,71 +308,71 @@ export function Pipeline() { newNode.data.vertexMetrics = vertexMetrics.has(vertex.name) ? vertexMetrics.get(vertex.name) : null; + newNode.data.buffers = []; + buffersInfo?.forEach((buffer) => { + const sidx = `${pipeline?.metadata?.namespace}-${pipeline?.metadata?.name}-`.length; + const eidx = buffer?.bufferName?.lastIndexOf("-"); + const bufferName = buffer?.bufferName?.substring(sidx, eidx); + if (vertex.name === bufferName) { + newNode?.data?.buffers.push(buffer); + } + }); + if (newNode.data.buffers.length === 0) newNode.data.buffers = null; newVertices.push(newNode); }); } return newVertices; - }, [pipeline, vertexPods, vertexMetrics]); + }, [pipeline, vertexPods, vertexMetrics, buffersInfo]); const edges = useMemo(() => { const newEdges: Edge[] = []; if ( pipeline?.spec?.edges && - edgesInfo && + buffersInfo && edgeWatermark ) { // for an edge it is the sum of backpressure between vertices - the value we see on the edge // map from edge-id( from-Vertex - to-Vertex ) to sum of backpressure const edgeBackpressureLabel = new Map(); - edgesInfo.forEach((edge) => { - const id = edge.fromVertex + "-" + edge.toVertex; - if (edgeBackpressureLabel.get(id) === undefined) edgeBackpressureLabel.set(id, Number(edge.totalMessages)); - else edgeBackpressureLabel.set(id, edgeBackpressureLabel.get(id) + Number(edge.totalMessages)); + buffersInfo.forEach((buffer) => { + const sidx = `${pipeline.metadata.namespace}-${pipeline.metadata.name}-`.length; + const eidx = buffer.bufferName.lastIndexOf("-"); + const id = buffer.bufferName.substring(sidx, eidx); + if (edgeBackpressureLabel.get(id) === undefined) edgeBackpressureLabel.set(id, Number(buffer.totalMessages)); + else edgeBackpressureLabel.set(id, edgeBackpressureLabel.get(id) + Number(buffer.totalMessages)); }); pipeline.spec.edges.map((edge) => { - edgesInfo.map((edgeInfo) => { - if ( - edgeInfo.fromVertex === edge.from && - edgeInfo.toVertex === edge.to - ) { - const id = edge.from + "-" + edge.to; - const pipelineEdge = { - id, - source: edge.from, - target: edge.to, - data: { - ...edgeInfo, - conditions: edge.conditions, - pending: edgeInfo.pendingCount, - ackPending: edgeInfo.ackPendingCount, - bufferLength: edgeInfo.bufferLength, - isFull: edgeInfo.isFull, - backpressureLabel: edgeBackpressureLabel.get(id), - }, - } as Edge; - pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id) - ? edgeWatermark.get(pipelineEdge.id) - : null; - pipelineEdge.animated = true; - pipelineEdge.type = 'custom'; - newEdges.push(pipelineEdge); - } - }); + const id = edge.from + "-" + edge.to; + const pipelineEdge = { + id, + source: edge.from, + target: edge.to, + data: { + conditions: edge.conditions, + backpressureLabel: edgeBackpressureLabel.get(edge.to), + }, + } as Edge; + pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id) + ? edgeWatermark.get(pipelineEdge.id) + : null; + pipelineEdge.animated = true; + pipelineEdge.type = 'custom'; + newEdges.push(pipelineEdge); }); } return newEdges; - }, [pipeline, edgesInfo, edgeWatermark]); + }, [pipeline, buffersInfo, edgeWatermark]); - if (pipelineError || edgesInfoError) { + if (pipelineError || buffersInfoError) { return
Error
; } return (
{pipeline?.spec && - edgesInfo.length > 0 && + buffersInfo.length > 0 && edges.length > 0 && vertices.length > 0 && ( { it("loads", () => { render(); expect(screen.getByTestId("conditions")).toBeVisible(); - expect(screen.getByTestId("isFull")).toBeVisible(); - expect(screen.getByTestId("pending")).toBeVisible(); - expect(screen.getByTestId("usage")).toBeVisible(); - expect(screen.getByTestId("bufferLength")).toBeVisible(); }); }); diff --git a/ui/src/components/pipeline/edgeinfo/EdgeInfo.tsx b/ui/src/components/pipeline/edgeinfo/EdgeInfo.tsx index fd7c5a60f..11146bb69 100644 --- a/ui/src/components/pipeline/edgeinfo/EdgeInfo.tsx +++ b/ui/src/components/pipeline/edgeinfo/EdgeInfo.tsx @@ -30,7 +30,7 @@ export default function EdgeInfo(props: EdgeInfoProps) { setValue(newValue); }; - const label = `${edge?.id} Buffer`; + const label = `${edge?.id} Edge`; return ( @@ -45,16 +45,6 @@ export default function EdgeInfo(props: EdgeInfoProps) { onChange={handleChange} aria-label={`${label}-details`} > - {edge?.data && ( - - )} {edge?.data?.edgeWatermark && ( )} {edge?.data?.conditions && ( @@ -72,85 +62,17 @@ export default function EdgeInfo(props: EdgeInfoProps) { }} data-testid="conditions" label="Conditions" - {...a11yProps(2)} + {...a11yProps(1)} /> )} - {edge?.data && ( - - - - - - Edge - isFull - AckPending - Pending - Buffer Length - Buffer Usage - Total Messages - - - - {edges.map((singleEdge, idx) => { - if ( - singleEdge?.source == edge.data?.fromVertex && - singleEdge?.target == edge.data?.toVertex - ) { - let isFull; - if (singleEdge?.data?.isFull) { - isFull = "yes"; - } else { - isFull = "no"; - } - let bufferUsage = ""; - if (typeof singleEdge?.data?.bufferUsage !== "undefined") { - bufferUsage = ( - singleEdge?.data?.bufferUsage * 100 - ).toFixed(2); - } - return ( - - - {singleEdge?.data?.bufferName.slice( - singleEdge.data.bufferName.indexOf("-") + 1 - )} - - {isFull} - - {singleEdge?.data?.ackPending} - - - {singleEdge?.data?.pending} - - - {singleEdge?.data?.bufferLength} - - - {bufferUsage}% - - - {singleEdge?.data?.totalMessages} - - - ); - } - })} - -
-
-
- )} {edge?.data?.edgeWatermark && ( - + @@ -176,7 +98,7 @@ export default function EdgeInfo(props: EdgeInfoProps) { )} {edge?.data?.conditions && ( - + {edges.map((singleEdge, idx) => { if ( singleEdge?.data?.conditions && diff --git a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx index 3e1b6628a..f1ef093ce 100644 --- a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx +++ b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx @@ -75,6 +75,14 @@ export default function NodeInfo(props: NodeInfoProps) { {...a11yProps(2)} /> )} + {node?.data?.buffers && ( + + )} {node?.id && ( @@ -155,6 +163,67 @@ export default function NodeInfo(props: NodeInfoProps) { )} )} + {node?.data?.buffers && ( + + +
+ + + Partition + isFull + AckPending + Pending + Buffer Length + Buffer Usage + Total Pending Messages + + + + {node.data.buffers.map((buffer, idx) => { + let isFull; + if (buffer?.isFull) { + isFull = "yes"; + } else { + isFull = "no"; + } + let bufferUsage = ""; + if (typeof buffer?.bufferUsage !== "undefined") { + bufferUsage = ( + buffer?.bufferUsage * 100 + ).toFixed(2); + } + return ( + + + {buffer?.bufferName} + + {isFull} + + {buffer?.ackPendingCount} + + + {buffer?.pendingCount} + + + {buffer?.bufferLength} + + + {bufferUsage}% + + + {buffer?.totalMessages} + + + ); + })} + +
+
+
+ )} ); } diff --git a/ui/src/utils/fetchWrappers/edgeInfoFetch.test.ts b/ui/src/utils/fetchWrappers/bufferInfoFetch.test.ts similarity index 76% rename from ui/src/utils/fetchWrappers/edgeInfoFetch.test.ts rename to ui/src/utils/fetchWrappers/bufferInfoFetch.test.ts index 88b48e7b1..6c168eeac 100644 --- a/ui/src/utils/fetchWrappers/edgeInfoFetch.test.ts +++ b/ui/src/utils/fetchWrappers/bufferInfoFetch.test.ts @@ -1,12 +1,12 @@ import {useFetch} from "./fetch"; import {renderHook} from "@testing-library/react"; -import {useEdgesInfoFetch} from "./edgeInfoFetch"; +import {useBuffersInfoFetch} from "./bufferInfoFetch"; jest.mock("../fetchWrappers/fetch"); const mockedUseFetch = useFetch as jest.MockedFunction; -describe("edgeInfoFetch test", () => { +describe("bufferInfoFetch test", () => { const data = { "pipeline": "simple-pipeline", "fromVertex": "input", @@ -22,8 +22,8 @@ describe("edgeInfoFetch test", () => { } it("edgesInfo return", () => { mockedUseFetch.mockReturnValue({data: data, error: false, loading: false}) - const {result} = renderHook(() => useEdgesInfoFetch("numaflow-system", "simple-pipeline", "")) - expect(result.current.edgesInfo).toEqual({ + const {result} = renderHook(() => useBuffersInfoFetch("numaflow-system", "simple-pipeline", "")) + expect(result.current.buffersInfo).toEqual({ "ackPendingCount": 0, "bufferLength": 10000, "bufferName": "numaflow-system-simple-pipeline-postproc-publisher", @@ -40,13 +40,13 @@ describe("edgeInfoFetch test", () => { it("edgesInfo loading", () => { mockedUseFetch.mockReturnValue({data: data, error: false, loading: true}); - const {result} = renderHook(() => useEdgesInfoFetch("numaflow-system", "simple-pipeline", "")) + const {result} = renderHook(() => useBuffersInfoFetch("numaflow-system", "simple-pipeline", "")) expect(result.current.loading).toBeTruthy() }) it("edgesInfo error", () => { mockedUseFetch.mockReturnValue({data: data, error: true, loading: false}) - const {result} = renderHook(() => useEdgesInfoFetch("numaflow-system", "simple-pipeline", "")) + const {result} = renderHook(() => useBuffersInfoFetch("numaflow-system", "simple-pipeline", "")) expect(result.current.error).toBeTruthy() }) }) diff --git a/ui/src/utils/fetchWrappers/edgeInfoFetch.ts b/ui/src/utils/fetchWrappers/bufferInfoFetch.ts similarity index 63% rename from ui/src/utils/fetchWrappers/edgeInfoFetch.ts rename to ui/src/utils/fetchWrappers/bufferInfoFetch.ts index a17176c89..b155bc7f6 100644 --- a/ui/src/utils/fetchWrappers/edgeInfoFetch.ts +++ b/ui/src/utils/fetchWrappers/bufferInfoFetch.ts @@ -1,13 +1,13 @@ import { useEffect, useState } from "react"; -import { EdgeInfo } from "../models/pipeline"; +import { BufferInfo } from "../models/pipeline"; import { useFetch } from "./fetch"; -export const useEdgesInfoFetch = ( +export const useBuffersInfoFetch = ( namespaceId: string | undefined, pipelineId: string | undefined, requestKey: string ) => { - const [edgesInfo, setEdgesInfo] = useState([]); + const [buffersInfo, setBuffersInfo] = useState([]); const [loading, setLoading] = useState(true); const { @@ -15,7 +15,7 @@ export const useEdgesInfoFetch = ( loading: fetchLoading, error, } = useFetch( - `/api/v1/namespaces/${namespaceId}/pipelines/${pipelineId}/edges?refreshKey=${requestKey}` + `/api/v1/namespaces/${namespaceId}/pipelines/${pipelineId}/buffers?refreshKey=${requestKey}` ); useEffect(() => { @@ -28,11 +28,11 @@ export const useEdgesInfoFetch = ( return; } if (data) { - setEdgesInfo(data); + setBuffersInfo(data); setLoading(false); return; } }, [data, fetchLoading]); - return { edgesInfo, error, loading }; + return { buffersInfo, error, loading }; }; diff --git a/ui/src/utils/models/pipeline.ts b/ui/src/utils/models/pipeline.ts index 17e5c5496..9bd55c54b 100644 --- a/ui/src/utils/models/pipeline.ts +++ b/ui/src/utils/models/pipeline.ts @@ -13,9 +13,8 @@ export interface EdgeWatermark { isWaterMarkEnabled: boolean; } -export interface EdgeInfo { - fromVertex: string; - toVertex: string; +export interface BufferInfo { + bufferName: string; ackPendingCount: number; pendingCount: number; totalMessages: number;