Skip to content

Commit

Permalink
fix: pipeline view fix (#755)
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <guptavedant2312@gmail.com>
  • Loading branch information
veds-g authored and whynowy committed May 30, 2023
1 parent 7cb399e commit 67277b7
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 200 deletions.
12 changes: 3 additions & 9 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,9 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
Name: vertexName,
},
}
podNum := int64(1)
// for now only reduce has parallelism might have to modify later
// checking parallelism for a vertex to identify reduce vertex
// replicas will have parallelism for reduce vertex else will be nil
// parallelism indicates replica count ~ multiple pods for a vertex here
obj := ps.pipeline.GetFromEdges(req.GetVertex())
if len(obj) > 0 && obj[0].DeprecatedParallelism != nil {
podNum = int64(*obj[0].DeprecatedParallelism)
}
// partitions indicates replica count ~ multiple pods for a vertex here
obj := ps.pipeline.NumOfPartitions(req.GetVertex())
podNum := int64(obj)

var vertexLevelRates map[string]float64

Expand Down
5 changes: 4 additions & 1 deletion pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,17 @@ func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffe
return nil
}

func (ms *mockIsbSvcClient) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) {
func (ms *mockIsbSvcClient) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) {
return nil, nil
}

func TestGetVertexMetrics(t *testing.T) {
pipelineName := "simple-pipeline"
vertexName := "cat"
vertexPartition := int32(1)
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: pipelineName},
Spec: v1alpha1.PipelineSpec{Vertices: []v1alpha1.AbstractVertex{{Name: vertexName, Partitions: &vertexPartition}}},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline, nil, nil, false)
Expand Down
6 changes: 3 additions & 3 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func GetEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline,
}

for _, edge := range pipeline.ListAllEdges() {
var wmFetcherList []fetch.Fetcher
bucketName := v1alpha1.GenerateEdgeBucketName(pipeline.Namespace, pipeline.Name, edge.From, edge.To)
fetchWatermark, err := isbSvcClient.CreateWatermarkFetcher(ctx, bucketName)
partitions := pipeline.NumOfPartitions(edge.To)
isReduce := pipeline.GetVertex(edge.To).IsReduceUDF()
wmFetcherList, err := isbSvcClient.CreateWatermarkFetcher(ctx, bucketName, partitions, isReduce)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
}
wmFetcherList = append(wmFetcherList, fetchWatermark)
wmFetchers[edge.From+"-"+edge.To] = wmFetcherList
}
return wmFetchers, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ISBService interface {
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string) error
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string) error
GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error)
CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error)
CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error)
}

// createOptions describes the options for creating buffers and buckets
Expand Down
34 changes: 20 additions & 14 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,21 +279,27 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return bufferInfo, nil
}

func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) {
hbBucketName := JetStreamProcessorBucket(bucketName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jss.jsClient)
if err != nil {
return nil, err
}
otBucketName := JetStreamOTBucket(bucketName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otBucketName, jss.jsClient)
if err != nil {
return nil, err
// TODO: revisit this when working on multi partitions
func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) {
var watermarkFetchers []fetch.Fetcher
for i := 0; i < partitions; i++ {
hbBucketName := JetStreamProcessorBucket(bucketName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jss.jsClient)
// should we return error if one is successful?
if err != nil {
return nil, err
}
otBucketName := JetStreamOTBucket(bucketName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otBucketName, jss.jsClient)
if err != nil {
return nil, err
}
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch)
pm := processor.NewProcessorManager(ctx, storeWatcher, processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
}
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch)
pm := processor.NewProcessorManager(ctx, storeWatcher)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
return watermarkFetcher, nil
return watermarkFetchers, nil
}

func JetStreamName(bufferName string) string {
Expand Down
20 changes: 13 additions & 7 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,18 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe
return bufferInfo, nil
}

func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string) (fetch.Fetcher, error) {
// TODO: revisit this when working on multi partitions
func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.Fetcher, error) {
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
hbWatcher := noop.NewKVOpWatch()
otWatcher := noop.NewKVOpWatch()
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher)
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
return watermarkFetcher, nil
var watermarkFetchers []fetch.Fetcher
for i := 0; i < partitions; i++ {
hbWatcher := noop.NewKVOpWatch()
otWatcher := noop.NewKVOpWatch()
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher, processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
}

return watermarkFetchers, nil
}
4 changes: 2 additions & 2 deletions server/apis/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Handler interface {
PodLogs(c *gin.Context)
ListPodsMetrics(c *gin.Context)
GetPodMetrics(c *gin.Context)
ListPipelineEdges(c *gin.Context)
GetPipelineEdge(c *gin.Context)
ListPipelineBuffers(c *gin.Context)
GetVertexBuffers(c *gin.Context)
GetPipelineWatermarks(c *gin.Context)
GetPipelineStatus(c *gin.Context)
ListNamespaces(c *gin.Context)
Expand Down
10 changes: 5 additions & 5 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ func (h *handler) PodLogs(c *gin.Context) {
}
}

// ListPipelineEdges is used to provide information about all the pipeline edges
func (h *handler) ListPipelineEdges(c *gin.Context) {
// ListPipelineBuffers is used to provide buffer information about all the pipeline vertices
func (h *handler) ListPipelineBuffers(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
Expand All @@ -240,8 +240,8 @@ func (h *handler) ListPipelineEdges(c *gin.Context) {
c.JSON(http.StatusOK, l)
}

// GetPipelineEdge is used to provide information about a single pipeline edge
func (h *handler) GetPipelineEdge(c *gin.Context) {
// GetVertexBuffers is used to provide buffer information about a single pipeline vertex
func (h *handler) GetVertexBuffers(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
Expand All @@ -253,7 +253,7 @@ func (h *handler) GetPipelineEdge(c *gin.Context) {
_ = client.Close()
}()
// Assume edge is the buffer name
i, err := client.GetPipelineBuffer(context.Background(), pipeline, c.Param("edge"))
i, err := client.GetPipelineBuffer(context.Background(), pipeline, c.Param("vertex"))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func v1Routes(r gin.IRouter) {
r.GET("/namespaces/:namespace/pods/:pod/log", handler.PodLogs)
r.GET("/metrics/namespaces/:namespace/pods", handler.ListPodsMetrics)
r.GET("/metrics/namespaces/:namespace/pods/:pod", handler.GetPodMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges", handler.ListPipelineEdges)
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/buffers", handler.ListPipelineBuffers)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/buffers", handler.GetVertexBuffers)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
r.GET("/namespaces/:namespace/pipelines/:pipeline/status", handler.GetPipelineStatus)
Expand Down
14 changes: 7 additions & 7 deletions ui/src/components/pipeline/Pipeline.test.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Pipeline} from "./Pipeline"
import {render, screen, waitFor} from "@testing-library/react"
import {usePipelineFetch} from "../../utils/fetchWrappers/pipelineFetch";
import {useEdgesInfoFetch} from "../../utils/fetchWrappers/edgeInfoFetch";
import {useBuffersInfoFetch} from "../../utils/fetchWrappers/bufferInfoFetch";

global.ResizeObserver = require('resize-observer-polyfill')

Expand All @@ -16,8 +16,8 @@ jest.mock("react-router-dom", () => ({
jest.mock("../../utils/fetchWrappers/pipelineFetch");
const mockedUsePipelineFetch = usePipelineFetch as jest.MockedFunction<typeof usePipelineFetch>;

jest.mock("../../utils/fetchWrappers/edgeInfoFetch");
const mockedUseEdgesInfoFetch = useEdgesInfoFetch as jest.MockedFunction<typeof useEdgesInfoFetch>;
jest.mock("../../utils/fetchWrappers/bufferInfoFetch");
const mockedUseBuffersInfoFetch = useBuffersInfoFetch as jest.MockedFunction<typeof useBuffersInfoFetch>;

describe("Pipeline", () => {
it("Load Graph screen", async () => {
Expand Down Expand Up @@ -107,8 +107,8 @@ describe("Pipeline", () => {
}
}, error: false, loading: false
});
mockedUseEdgesInfoFetch.mockReturnValue({
edgesInfo: [{
mockedUseBuffersInfoFetch.mockReturnValue({
buffersInfo: [{
"fromVertex": "input",
"toVertex": "preproc",
"pendingCount": 8133,
Expand Down Expand Up @@ -262,8 +262,8 @@ describe("Pipeline", () => {
}
}, error: "error", loading: false
});
mockedUseEdgesInfoFetch.mockReturnValue({
edgesInfo: [{
mockedUseBuffersInfoFetch.mockReturnValue({
buffersInfo: [{
"fromVertex": "input",
"toVertex": "preproc",
"pendingCount": 8133,
Expand Down
93 changes: 47 additions & 46 deletions ui/src/components/pipeline/Pipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { useParams } from "react-router-dom";
import { Edge, Node } from "reactflow";
import Graph from "./graph/Graph";
import { usePipelineFetch } from "../../utils/fetchWrappers/pipelineFetch";
import { useEdgesInfoFetch } from "../../utils/fetchWrappers/edgeInfoFetch";
import { useBuffersInfoFetch } from "../../utils/fetchWrappers/bufferInfoFetch";
import { VertexMetrics, EdgeWatermark } from "../../utils/models/pipeline";
import "./Pipeline.css";
import { notifyError } from "../../utils/error";
Expand Down Expand Up @@ -47,8 +47,8 @@ export function Pipeline() {
useEffect(() => {
if (pipelineError){
notifyError([{
error: "Failed to fetch the pipeline vertices",
options: {toastId: "pl-vertex", autoClose: false}
error: "Failed to fetch the pipeline details",
options: {toastId: "pl-details", autoClose: false}
}]);
}
}, [pipelineError]);
Expand All @@ -63,21 +63,21 @@ export function Pipeline() {
};
}, []);

const { edgesInfo, error: edgesInfoError } = useEdgesInfoFetch(
const { buffersInfo, error: buffersInfoError } = useBuffersInfoFetch(
namespaceId,
pipelineId,
edgesInfoRequestKey
);

// This useEffect notifies about the errors while querying for the edges of the pipeline
useEffect(() => {
if (edgesInfoError) {
if (buffersInfoError) {
notifyError([{
error: "Failed to fetch the pipeline edges",
options: {toastId: "pl-edge", autoClose: false}
error: "Failed to fetch the pipeline buffers",
options: {toastId: "pl-buffer", autoClose: false}
}]);
}
}, [edgesInfoError]);
}, [buffersInfoError]);

useEffect(() => {
// Refresh edgesInfo every x ms
Expand Down Expand Up @@ -283,7 +283,8 @@ export function Pipeline() {
if (
pipeline?.spec?.vertices &&
vertexPods &&
vertexMetrics
vertexMetrics &&
buffersInfo
) {
pipeline.spec.vertices.map((vertex) => {
const newNode = {} as Node;
Expand All @@ -307,71 +308,71 @@ export function Pipeline() {
newNode.data.vertexMetrics = vertexMetrics.has(vertex.name)
? vertexMetrics.get(vertex.name)
: null;
newNode.data.buffers = [];
buffersInfo?.forEach((buffer) => {
const sidx = `${pipeline?.metadata?.namespace}-${pipeline?.metadata?.name}-`.length;
const eidx = buffer?.bufferName?.lastIndexOf("-");
const bufferName = buffer?.bufferName?.substring(sidx, eidx);
if (vertex.name === bufferName) {
newNode?.data?.buffers.push(buffer);
}
});
if (newNode.data.buffers.length === 0) newNode.data.buffers = null;
newVertices.push(newNode);
});
}
return newVertices;
}, [pipeline, vertexPods, vertexMetrics]);
}, [pipeline, vertexPods, vertexMetrics, buffersInfo]);

const edges = useMemo(() => {
const newEdges: Edge[] = [];
if (
pipeline?.spec?.edges &&
edgesInfo &&
buffersInfo &&
edgeWatermark
) {
// for an edge it is the sum of backpressure between vertices - the value we see on the edge
// map from edge-id( from-Vertex - to-Vertex ) to sum of backpressure
const edgeBackpressureLabel = new Map();

edgesInfo.forEach((edge) => {
const id = edge.fromVertex + "-" + edge.toVertex;
if (edgeBackpressureLabel.get(id) === undefined) edgeBackpressureLabel.set(id, Number(edge.totalMessages));
else edgeBackpressureLabel.set(id, edgeBackpressureLabel.get(id) + Number(edge.totalMessages));
buffersInfo.forEach((buffer) => {
const sidx = `${pipeline.metadata.namespace}-${pipeline.metadata.name}-`.length;
const eidx = buffer.bufferName.lastIndexOf("-");
const id = buffer.bufferName.substring(sidx, eidx);
if (edgeBackpressureLabel.get(id) === undefined) edgeBackpressureLabel.set(id, Number(buffer.totalMessages));
else edgeBackpressureLabel.set(id, edgeBackpressureLabel.get(id) + Number(buffer.totalMessages));
});

pipeline.spec.edges.map((edge) => {
edgesInfo.map((edgeInfo) => {
if (
edgeInfo.fromVertex === edge.from &&
edgeInfo.toVertex === edge.to
) {
const id = edge.from + "-" + edge.to;
const pipelineEdge = {
id,
source: edge.from,
target: edge.to,
data: {
...edgeInfo,
conditions: edge.conditions,
pending: edgeInfo.pendingCount,
ackPending: edgeInfo.ackPendingCount,
bufferLength: edgeInfo.bufferLength,
isFull: edgeInfo.isFull,
backpressureLabel: edgeBackpressureLabel.get(id),
},
} as Edge;
pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id)
? edgeWatermark.get(pipelineEdge.id)
: null;
pipelineEdge.animated = true;
pipelineEdge.type = 'custom';
newEdges.push(pipelineEdge);
}
});
const id = edge.from + "-" + edge.to;
const pipelineEdge = {
id,
source: edge.from,
target: edge.to,
data: {
conditions: edge.conditions,
backpressureLabel: edgeBackpressureLabel.get(edge.to),
},
} as Edge;
pipelineEdge.data.edgeWatermark = edgeWatermark.has(pipelineEdge.id)
? edgeWatermark.get(pipelineEdge.id)
: null;
pipelineEdge.animated = true;
pipelineEdge.type = 'custom';
newEdges.push(pipelineEdge);
});
}
return newEdges;
}, [pipeline, edgesInfo, edgeWatermark]);
}, [pipeline, buffersInfo, edgeWatermark]);

if (pipelineError || edgesInfoError) {
if (pipelineError || buffersInfoError) {
return <div>Error</div>;
}

return (
<div data-testid={"pipeline"} style={{ overflow: "scroll !important" }}>
{pipeline?.spec &&
edgesInfo.length > 0 &&
buffersInfo.length > 0 &&
edges.length > 0 &&
vertices.length > 0 && (
<Graph
Expand Down

0 comments on commit 67277b7

Please sign in to comment.