Skip to content

Commit

Permalink
feat: Expose watermark over HTTP (#120)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
kohlisid authored and whynowy committed Aug 6, 2022
1 parent c09502a commit ffd38a1
Show file tree
Hide file tree
Showing 12 changed files with 1,250 additions and 128 deletions.
1,054 changes: 941 additions & 113 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

123 changes: 123 additions & 0 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ message GetVertexMetricsResponse {
required VertexMetrics vertex = 1;
}

/* Watermark */
// VertexWatermark has vertex to watermark mapping.
message VertexWatermark {
required string pipeline = 1;
required string vertex = 2;
required int64 watermark = 3;
required bool isWatermarkEnabled = 4;
}

message GetVertexWatermarkResponse {
required VertexWatermark vertexWatermark = 1;
}

// GetVertexWatermarksRequest requests for the watermark for a pipeline's vertex.
message GetVertexWatermarkRequest {
required string pipeline = 1;
required string vertex = 2;
}

// DaemonService is a grpc service that is used to provide APIs for giving any pipeline information.
service DaemonService {

Expand All @@ -72,4 +91,9 @@ service DaemonService {
rpc GetVertexMetrics (GetVertexMetricsRequest) returns (GetVertexMetricsResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/metrics";
};

// GetVertexWatermark return the watermark of the given vertex based on the incoming edge buffer
rpc GetVertexWatermark (GetVertexWatermarkRequest) returns (GetVertexWatermarkResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/watermark";
};
}
12 changes: 12 additions & 0 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@ func (dc *DaemonClient) GetVertexMetrics(ctx context.Context, pipeline, vertex s
return rspn.Vertex, nil
}
}

// GetVertexWatermark returns the VertexWatermark response instance for GetVertexWatermarkRequest
func (dc *DaemonClient) GetVertexWatermark(ctx context.Context, pipeline, vertex string) (*daemon.VertexWatermark, error) {
if rspn, err := dc.client.GetVertexWatermark(ctx, &daemon.GetVertexWatermarkRequest{
Pipeline: &pipeline,
Vertex: &vertex,
}); err != nil {
return nil, err
} else {
return rspn.VertexWatermark, nil
}
}
2 changes: 1 addition & 1 deletion pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) *grpc.Serv
}
grpcServer := grpc.NewServer(sOpts...)
grpc_prometheus.Register(grpcServer)
daemon.RegisterDaemonServiceServer(grpcServer, service.NewPipelineMetricsQueryService(isbSvcClient, ds.pipeline))
daemon.RegisterDaemonServiceServer(grpcServer, service.NewPipelineMetadataQuery(isbSvcClient, ds.pipeline))
return grpcServer
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package service is built for querying metadata and to expose it over daemon service.
package service

import (
Expand All @@ -24,15 +25,17 @@ type metricsHttpClient interface {
Get(url string) (*http.Response, error)
}

type pipelineMetricsQueryService struct {
isbSvcClient isbsvc.ISBService
pipeline *v1alpha1.Pipeline
httpClient metricsHttpClient
// pipelineMetadataQuery has the metadata required for the pipeline queries
type pipelineMetadataQuery struct {
isbSvcClient isbsvc.ISBService
pipeline *v1alpha1.Pipeline
httpClient metricsHttpClient
vertexWatermark *watermarkFetchers
}

// NewPipelineMetricsQueryService returns a new instance of pipelineMetricsQueryService
func NewPipelineMetricsQueryService(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) *pipelineMetricsQueryService {
return &pipelineMetricsQueryService{
// NewPipelineMetadataQuery returns a new instance of pipelineMetadataQuery
func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) *pipelineMetadataQuery {
ps := pipelineMetadataQuery{
isbSvcClient: isbSvcClient,
pipeline: pipeline,
httpClient: &http.Client{
Expand All @@ -42,10 +45,12 @@ func NewPipelineMetricsQueryService(isbSvcClient isbsvc.ISBService, pipeline *v1
Timeout: time.Second * 3,
},
}
ps.vertexWatermark = newVertexWatermarkFetcher(pipeline)
return &ps
}

// ListBuffers is used to obtain the all the edge buffers information of a pipeline
func (ps *pipelineMetricsQueryService) ListBuffers(ctx context.Context, req *daemon.ListBuffersRequest) (*daemon.ListBuffersResponse, error) {
func (ps *pipelineMetadataQuery) ListBuffers(ctx context.Context, req *daemon.ListBuffersRequest) (*daemon.ListBuffersResponse, error) {
log := logging.FromContext(ctx)
resp := new(daemon.ListBuffersResponse)

Expand Down Expand Up @@ -82,7 +87,7 @@ func (ps *pipelineMetricsQueryService) ListBuffers(ctx context.Context, req *dae
}

// GetBuffer is used to obtain one buffer information of a pipeline
func (ps *pipelineMetricsQueryService) GetBuffer(ctx context.Context, req *daemon.GetBufferRequest) (*daemon.GetBufferResponse, error) {
func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetBufferRequest) (*daemon.GetBufferResponse, error) {
bufferInfo, err := ps.isbSvcClient.GetBufferInfo(ctx, v1alpha1.Buffer{Name: *req.Buffer, Type: v1alpha1.EdgeBuffer})
if err != nil {
return nil, fmt.Errorf("failed to get information of buffer %q:%v", *req.Buffer, err)
Expand Down Expand Up @@ -117,7 +122,7 @@ func (ps *pipelineMetricsQueryService) GetBuffer(ctx context.Context, req *daemo
// GetVertexMetrics is used to query the metrics service and is used to obtain the processing rate of a given vertex for 1m, 5m and 15m.
// In the future maybe latency will also be added here?
// Should this method live here or maybe another file?
func (ps *pipelineMetricsQueryService) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) {
func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) {
log := logging.FromContext(ctx)
resp := new(daemon.GetVertexMetricsResponse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetVertexMetrics(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: pipelineName},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService := NewPipelineMetricsQueryService(client, pipeline)
pipelineMetricsQueryService := NewPipelineMetadataQuery(client, pipeline)

metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestGetBuffer(t *testing.T) {
}

ms := &mockIsbSvcClient{}
pipelineMetricsQueryService := NewPipelineMetricsQueryService(ms, pipeline)
pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline)

bufferName := "numaflow-system-simple-pipeline-in-cat"

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestListBuffers(t *testing.T) {
}

ms := &mockIsbSvcClient{}
pipelineMetricsQueryService := NewPipelineMetricsQueryService(ms, pipeline)
pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline)

req := &daemon.ListBuffersRequest{Pipeline: &pipelineName}

Expand Down
Loading

0 comments on commit ffd38a1

Please sign in to comment.