Skip to content

Commit

Permalink
new ConsumeStatusServer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
OS-M committed Jan 12, 2023
1 parent 05e28a7 commit 542e390
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 18 deletions.
10 changes: 9 additions & 1 deletion cmd/dev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ func main() {
go func() {
serv := consume_status.NewConsumeStatusServer()
serv.Start(context.Background(), 27000, 8080)
serv.UpdateNetworkIndexingStatus("test", time.Now(), "1")
serv.RegisterStream("eth-main-0", "eth-main")
serv.RegisterStream("eth-main-1", "eth-main")
serv.RegisterStream("polygon-mumbai-0", "polygon-mumbai")
serv.RegisterStream("polygon-mumbai-1", "polygon-mumbai")

serv.UpdateStreamStatus("eth-main-0", time.Unix(time.Now().Unix()-1000000, 0), "100")
serv.UpdateStreamStatus("eth-main-1", time.Unix(time.Now().Unix(), 0), "200")
serv.UpdateStreamStatus("polygon-mumbai-0", time.Unix(time.Now().Unix()-1200000, 0), "110")
serv.UpdateStreamStatus("polygon-mumbai-1", time.Unix(time.Now().Unix()-100, 0), "250")
for {
time.Sleep(time.Second)
}
Expand Down
66 changes: 51 additions & 15 deletions pkg/consume_status/consume_status_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,31 @@ import (
"fmt"
pb "github.com/proxima-one/indexer-utils-go/pkg/consume_status/internal/proto"
"github.com/proxima-one/indexer-utils-go/pkg/grpc_gateway"
"github.com/proxima-one/indexer-utils-go/pkg/utils"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"net"
"sync"
"time"
)

type networkIndexingStatus struct {
Network string
type indexingStatus struct {
Timestamp time.Time
BlockNumber string
}

type ConsumeStatusServer struct {
states map[string]*networkIndexingStatus
mu sync.RWMutex
statusByStreamId map[string]indexingStatus
networkByStreamId map[string]string
}

func NewConsumeStatusServer() *ConsumeStatusServer {
return &ConsumeStatusServer{
states: make(map[string]*networkIndexingStatus),
statusByStreamId: make(map[string]indexingStatus),
networkByStreamId: make(map[string]string),
}
}

Expand Down Expand Up @@ -53,24 +57,56 @@ func (s *ConsumeStatusServer) Start(ctx context.Context, grpcPort, httpPort int)
}()
}

func (s *ConsumeStatusServer) UpdateNetworkIndexingStatus(network string, timestamp time.Time, blockNumber string) {
if _, ok := s.states[network]; !ok {
s.states[network] = &networkIndexingStatus{}
func (s *ConsumeStatusServer) RegisterStream(streamId, network string) {
s.mu.Lock()
s.networkByStreamId[streamId] = network
s.mu.Unlock()
}

// UpdateStreamStatus updates stream status of a stream. Stream must be already registered
func (s *ConsumeStatusServer) UpdateStreamStatus(streamId string, timestamp time.Time, blockNumber string) {
s.mu.RLock()
if _, ok := s.networkByStreamId[streamId]; !ok {
s.mu.RUnlock()
utils.PanicOnError(fmt.Errorf("stream %s is not registered", streamId))
}
s.mu.RUnlock()

s.mu.Lock()
defer s.mu.Unlock()
s.statusByStreamId[streamId] = indexingStatus{
Timestamp: timestamp,
BlockNumber: blockNumber,
}
s.states[network].Network = network
s.states[network].Timestamp = timestamp
s.states[network].BlockNumber = blockNumber
}

func (s *ConsumeStatusServer) GetStatus(_ context.Context, _ *emptypb.Empty) (*pb.GetStatusResponse, error) {
res := &pb.GetStatusResponse{Networks: make([]*pb.NetworkIndexingStatus, 0)}
s.mu.RLock()
networkStatuses := make(map[string]*indexingStatus)
for streamId, streamStatus := range s.statusByStreamId {
network := s.networkByStreamId[streamId]
if _, ok := networkStatuses[network]; !ok {
networkStatuses[network] = &indexingStatus{
Timestamp: streamStatus.Timestamp,
BlockNumber: streamStatus.BlockNumber,
}
} else {
networkStatus := networkStatuses[network]
if utils.MustConvStrToInt64(streamStatus.BlockNumber) < utils.MustConvStrToInt64(networkStatus.BlockNumber) {
networkStatus.Timestamp = streamStatus.Timestamp
networkStatus.BlockNumber = streamStatus.BlockNumber
}
}
}
s.mu.RUnlock()

for _, state := range s.states {
res := &pb.GetStatusResponse{Networks: make([]*pb.NetworkIndexingStatus, 0)}
for network, status := range networkStatuses {
res.Networks = append(res.Networks, &pb.NetworkIndexingStatus{
Network: state.Network,
Network: network,
Status: &pb.IndexingStatus{
Timestamp: timestamppb.New(state.Timestamp),
BlockNumber: &state.BlockNumber,
Timestamp: timestamppb.New(status.Timestamp),
BlockNumber: &status.BlockNumber,
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (logger *Logger) StartLiveStreamUpdate(
streamId string,
startOffset proximaclient.Offset,
findStream func(stream string) (*proximaclient.Stream, error),
timeout time.Duration) {
interval time.Duration) {

t := time.NewTicker(timeout)
t := time.NewTicker(interval)
defer t.Stop()
for ctx.Err() == nil {
lastOffset := lastOffsetForStream(streamId, findStream)
Expand Down
6 changes: 6 additions & 0 deletions pkg/utils/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ func StringToBigInt(str string) *big.Int {
func StringToInt64(s string) (int64, error) {
return strconv.ParseInt(s, 10, 64)
}

func MustConvStrToInt64(s string) int64 {
res, err := strconv.ParseInt(s, 10, 64)
PanicOnError(err)
return res
}

0 comments on commit 542e390

Please sign in to comment.