Skip to content

Commit

Permalink
added status server and consumer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
OS-M committed Jan 6, 2023
1 parent 02a681d commit 85f2742
Show file tree
Hide file tree
Showing 16 changed files with 1,032 additions and 24 deletions.
30 changes: 30 additions & 0 deletions cmd/dev/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"context"
"github.com/proxima-one/indexer-utils-go/pkg/consumer_metrics"
"github.com/proxima-one/indexer-utils-go/pkg/status_server"
"time"
)

func main() {
go func() {
serv := status_server.NewStatusServer()
serv.Start(context.Background(), 27000, 8080)
serv.UpdateNetworkIndexingStatus("test", time.Now(), "1")
for {
time.Sleep(time.Second)
}
}()
go func() {
metrics := consumer_metrics.NewConsumerMetricsServer().EnableConsumerMetrics(context.Background())
go metrics.Start(12228)
for {
metrics.EventProcessed("net1", time.Unix(time.Now().Unix()-100, 0))
time.Sleep(700 * time.Millisecond)
metrics.EventProcessed("net2", time.Unix(time.Now().Unix()-200, 0))
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(1e18)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ require (
github.com/prometheus/client_golang v1.12.2
github.com/proxima-one/streamdb-client-go v1.0.1
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

require (
Expand All @@ -28,6 +30,4 @@ require (
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
125 changes: 125 additions & 0 deletions pkg/consumer_metrics/consumer-metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package consumer_metrics

import (
"context"
"fmt"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"net/http"
"time"
)

type eventProcessedEvent struct {
streamId string
eventTimestamp time.Time
timestamp time.Time
}

type IndexingServiceMetricsServer struct {
processedEvents chan eventProcessedEvent
}

func NewConsumerMetricsServer() *IndexingServiceMetricsServer {
return new(IndexingServiceMetricsServer)
}

func (cm *IndexingServiceMetricsServer) EnableConsumerMetrics(ctx context.Context) *IndexingServiceMetricsServer {
cm.processedEvents = make(chan eventProcessedEvent, 100)
processingDelay := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "",
Name: "index_processing_delay",
Help: "How many seconds lasted from last processed event." +
" If many of streams are processing - the worst one",
})
eventsDelay := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "",
Name: "index_events_delay",
Help: "How many seconds lasted from last processed event’s timestamp." +
" If many of streams are processing - the worst one",
})
messagesPerSec := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "",
Name: "index_processing_speed",
Help: "This metric tells how many messages are processed per second",
})
prometheus.MustRegister(processingDelay)
prometheus.MustRegister(eventsDelay)
prometheus.MustRegister(messagesPerSec)

go func() {
type streamData struct {
lastEventGotTime time.Time
lastEventTimestamp time.Time
eventsSinceLastUpdate int64
}

streams := make(map[string]*streamData)

lastUpdateTime := time.Now()

t := time.NewTicker(1 * time.Second)
for ctx.Err() == nil {
select {
case <-t.C:
lastEventGotTime := time.Now()
lastEventTimestamp := time.Now()
eventsSinceLastUpdate := int64(1e18)
for _, data := range streams {
if lastEventGotTime.After(data.lastEventGotTime) {
lastEventGotTime = data.lastEventGotTime
}
if lastEventTimestamp.After(data.lastEventTimestamp) {
lastEventTimestamp = data.lastEventTimestamp
}
if eventsSinceLastUpdate > data.eventsSinceLastUpdate {
eventsSinceLastUpdate = data.eventsSinceLastUpdate
}
data.eventsSinceLastUpdate = 0
}

processingDelay.Set(time.Since(lastEventGotTime).Seconds())
eventsDelay.Set(time.Since(lastEventTimestamp).Seconds())
messagesPerSec.Set(
1000 * float64(eventsSinceLastUpdate) / float64(time.Since(lastUpdateTime).Milliseconds()))

lastUpdateTime = time.Now()

case event := <-cm.processedEvents:
if streams[event.streamId] == nil {
streams[event.streamId] = new(streamData)
}
streams[event.streamId].eventsSinceLastUpdate++
streams[event.streamId].lastEventTimestamp = event.eventTimestamp
streams[event.streamId].lastEventGotTime = event.timestamp

case <-ctx.Done():
return
}
}
}()
return cm
}

func (cm *IndexingServiceMetricsServer) EnableServerMetrics(server *grpc.Server) *IndexingServiceMetricsServer {
grpcPrometheus.EnableHandlingTimeHistogram()
grpcPrometheus.Register(server)
return cm
}

func (cm *IndexingServiceMetricsServer) Start(port int) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func (cm *IndexingServiceMetricsServer) EventProcessed(stream string, timestamp time.Time) {
if cm.processedEvents == nil {
panic("cannot use consumer metrics with server-only IndexingServiceMetricsServer")
}
cm.processedEvents <- eventProcessedEvent{
streamId: stream,
eventTimestamp: timestamp,
timestamp: time.Now(),
}
}
10 changes: 4 additions & 6 deletions pkg/grpc_gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"google.golang.org/grpc/credentials/insecure"
"io/fs"
"log"
"mime"
"net/http"
"strings"
Expand Down Expand Up @@ -38,11 +37,11 @@ func getProtoFileHandler(folder fs.FS) http.Handler {
}

// Run runs the gRPC-Gateway, dialling the provided address.
func Run(grpcAddress, port, protoFileName string, protoFileFolder fs.FS,
func Run(ctx context.Context, grpcAddress string, httpPort int, protoFileFolder fs.FS, protoFileName string,
registerServiceHandler func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error) error {

conn, err := grpc.DialContext(
context.Background(),
ctx,
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
Expand All @@ -52,15 +51,15 @@ func Run(grpcAddress, port, protoFileName string, protoFileFolder fs.FS,
}

gwmux := runtime.NewServeMux()
err = registerServiceHandler(context.Background(), gwmux, conn)
err = registerServiceHandler(ctx, gwmux, conn)
if err != nil {
return fmt.Errorf("failed to register openapi: %w", err)
}

openAPIHandler := getOpenAPIHandler()
protoFileHandler := getProtoFileHandler(protoFileFolder)

gatewayAddr := "0.0.0.0:" + port
gatewayAddr := fmt.Sprintf("0.0.0.0:%d", httpPort)
gwServer := &http.Server{
Addr: gatewayAddr,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -76,6 +75,5 @@ func Run(grpcAddress, port, protoFileName string, protoFileFolder fs.FS,
}),
}

log.Println("Serving gRPC-Gateway and OpenAPI Documentation on http://", gatewayAddr)
return fmt.Errorf("serving gRPC-Gateway server error: %w", gwServer.ListenAndServe())
}
15 changes: 0 additions & 15 deletions pkg/grpc_metrics/grpc_metrics.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/utils/logger.go → pkg/logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package logger

import (
"context"
Expand Down
15 changes: 15 additions & 0 deletions pkg/status_server/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: v1
managed:
enabled: true
plugins:
- remote: buf.build/library/plugins/go:v1.27.1-1
out: .
opt: paths=source_relative
- remote: buf.build/library/plugins/go-grpc:v1.1.0-2
out: .
opt: paths=source_relative,require_unimplemented_servers=false
- remote: buf.build/grpc-ecosystem/plugins/grpc-gateway:v2.7.2-1
out: .
opt: paths=source_relative
- remote: buf.build/grpc-ecosystem/plugins/openapiv2:v2.7.2-1
out: .
11 changes: 11 additions & 0 deletions pkg/status_server/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: googleapis
repository: googleapis
commit: 80720a488c9a414bb8d4a9f811084989
- remote: buf.build
owner: grpc-ecosystem
repository: grpc-gateway
commit: 00116f302b12478b85deb33b734e026c
13 changes: 13 additions & 0 deletions pkg/status_server/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: v1
build:
excludes:
- deploy/node_modules
deps:
- buf.build/googleapis/googleapis
- buf.build/grpc-ecosystem/grpc-gateway
lint:
use:
- DEFAULT
breaking:
use:
- FILE
6 changes: 6 additions & 0 deletions pkg/status_server/internal/proto/embed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package internal

import "embed"

//go:embed *
var ProtoDir embed.FS
Loading

0 comments on commit 85f2742

Please sign in to comment.