Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing pipedstat model #2261

Merged
merged 5 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ genrule(
# gazelle:exclude pkg/model/logblock.pb.validate.go
# gazelle:exclude pkg/model/notificationevent.pb.validate.go
# gazelle:exclude pkg/model/piped.pb.validate.go
# gazelle:exclude pkg/model/piped_stat.pb.validate.go
# gazelle:exclude pkg/model/piped_stats.pb.validate.go
# gazelle:exclude pkg/model/planpreview.pb.validate.go
# gazelle:exclude pkg/model/project.pb.validate.go
Expand Down
2 changes: 1 addition & 1 deletion cmd/pipecd/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
}

rd := redis.NewRedis(s.cacheAddress, "")
statCache := rediscache.NewTTLHashCache(rd, pipedStatTTL, defaultPipedStatHashKey)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)
psb := pipedstatsbuilder.NewPipedStatsBuilder(statCache, t.Logger)

// Start running admin server.
Expand Down
3 changes: 1 addition & 2 deletions cmd/pipecd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var (

const (
defaultPipedStatHashKey = "HASHKEY:PIPED:STATS"
pipedStatTTL = 2 * time.Minute
)

type httpHandler interface {
Expand Down Expand Up @@ -190,7 +189,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
cmds := commandstore.NewStore(ds, cache, t.Logger)
is := insightstore.NewStore(fs)
cmdOutputStore := commandoutputstore.NewStore(fs, t.Logger)
statCache := rediscache.NewTTLHashCache(rd, pipedStatTTL, defaultPipedStatHashKey)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)

// Start a gRPC server for handling PipedAPI requests.
{
Expand Down
14 changes: 13 additions & 1 deletion pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpcapi

import (
"context"
"encoding/json"
"errors"
"time"

Expand Down Expand Up @@ -98,7 +99,18 @@ func (a *PipedAPI) ReportStat(ctx context.Context, req *pipedservice.ReportStatR
if err != nil {
return nil, err
}
if err := a.pipedStatCache.Put(pipedID, req.PipedStats); err != nil {

now := time.Now().Unix()
val, err := json.Marshal(model.PipedStat{PipedId: pipedID, Metrics: req.PipedStats, Timestamp: now})
if err != nil {
a.logger.Error("failed to store the reported piped stat",
zap.String("piped-id", pipedID),
zap.Error(err),
)
return nil, status.Error(codes.Internal, "failed to encode the reported piped stat")
}

if err := a.pipedStatCache.Put(pipedID, val); err != nil {
a.logger.Error("failed to store the reported piped stat",
zap.String("piped-id", pipedID),
zap.Error(err),
Expand Down
2 changes: 2 additions & 0 deletions pkg/app/ops/pipedstatsbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/cache:go_default_library",
"//pkg/model:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
Expand All @@ -19,6 +20,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/cache:go_default_library",
"//pkg/model:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
"@org_uber_go_zap//:go_default_library",
Expand Down
12 changes: 10 additions & 2 deletions pkg/app/ops/pipedstatsbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package pipedstatsbuilder

import (
"bytes"
"encoding/json"
"errors"
"io"

"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/cache"
"github.com/pipe-cd/pipe/pkg/model"
)

type PipedStatsBuilder struct {
Expand All @@ -47,10 +49,16 @@ func (b *PipedStatsBuilder) Build() (io.Reader, error) {
value, okValue := v.([]byte)
if !okValue {
err = errors.New("error value not a bulk of string value")
b.logger.Error("failed to marshal piped stat data", zap.Error(err))
b.logger.Error("failed to unmarshal piped stat data", zap.Error(err))
return nil, err
}
data = append(data, value)
ps := model.PipedStat{}
if err = json.Unmarshal(value, &ps); err != nil {
b.logger.Error("failed to unmarshal piped stat data", zap.Error(err))
return nil, err
}
// TODO: Filter returning piped metrics by value timestamp.
data = append(data, ps.Metrics)
}
return bytes.NewReader(bytes.Join(data, []byte("\n"))), nil
}
6 changes: 5 additions & 1 deletion pkg/app/ops/pipedstatsbuilder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package pipedstatsbuilder

import (
"encoding/json"
"io"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/cache"
"github.com/pipe-cd/pipe/pkg/model"
)

type mockBuilderBackend struct {
Expand All @@ -48,7 +51,8 @@ func (m *mockBuilderBackend) GetAll() (map[string]interface{}, error) {
if err != nil {
return nil, err
}
out[file] = data
val, _ := json.Marshal(model.PipedStat{Metrics: data, Timestamp: time.Now().Unix()})
out[file] = val
}
return out, nil
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/cache/rediscache/hashcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package rediscache

import (
"errors"
"time"

redigo "github.com/gomodule/redigo/redis"

Expand All @@ -37,14 +36,6 @@ func NewHashCache(redis redis.Redis, key string) *RedisHashCache {
}
}

func NewTTLHashCache(redis redis.Redis, ttl time.Duration, key string) *RedisHashCache {
return &RedisHashCache{
redis: redis,
ttl: uint(ttl.Seconds()),
key: key,
}
}

func (r *RedisHashCache) Get(k string) (interface{}, error) {
conn := r.redis.Get()
defer conn.Close()
Expand Down
2 changes: 2 additions & 0 deletions pkg/datastore/pipedstatsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var pipedStatsFactory = func() interface{} {
return &model.PipedStats{}
}

// Deprecated: PipedStats model is deprecated, along with its store interface.
khanhtc1202 marked this conversation as resolved.
Show resolved Hide resolved
type PipedStatsStore interface {
AddPipedStats(ctx context.Context, ps *model.PipedStats) error
ListPipedStatss(ctx context.Context, opts ListOptions) ([]model.PipedStats, error)
Expand All @@ -37,6 +38,7 @@ type pipedStatsStore struct {
nowFunc func() time.Time
}

// Deprecated: PipedStats model is deprecated, along with its store interface.
khanhtc1202 marked this conversation as resolved.
Show resolved Hide resolved
func NewPipedStatsStore(ds DataStore) PipedStatsStore {
return &pipedStatsStore{
backend: backend{
Expand Down
1 change: 1 addition & 0 deletions pkg/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ proto_library(
"logblock.proto",
"notificationevent.proto",
"piped.proto",
"piped_stat.proto",
"piped_stats.proto",
"planpreview.proto",
"project.proto",
Expand Down
26 changes: 26 additions & 0 deletions pkg/model/piped_stat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package pipe.model;
option go_package = "github.com/pipe-cd/pipe/pkg/model";

import "validate/validate.proto";

message PipedStat {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding PipedID into this message?
I know we are having that ID from the map key but ID inside its data could be helpful as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍 Btw I'm thinking about the current model name, just want to make it PipedMetrics. In that case, PipedMetrics.PipedID and PipedMetrics.Stat looks better than PipedStat.PipedID and PipedStat.Stat as currently. Just wonder is that name too generics or something 🤔 How do you think about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think PipedMetrics is good too. But currently, our RPC name is Report...Stats, right?
So maybe we should keep that model name as PipedStat and change its Stat field to Metrics.
Or we can reuse the old PipedStats model, deprecate unused fields and add new fields like PipedID, Metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe we should keep that model name as PipedStat and change its Stat field to Metrics.

Get your point, lets me adopt this 👍 ( Personally don't think the name should be XXXStats since this message covers only a single value of piped committed stat at a time )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by ae07feb 🙏

string piped_id = 1 [(validate.rules).string.min_len = 1];
bytes metrics = 2;
int64 timestamp = 10 [(validate.rules).int64.gt = 0];
}
1 change: 1 addition & 0 deletions pkg/model/piped_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message PrometheusMetrics {
}

message PipedStats {
option deprecated = true;
string version = 1 [(validate.rules).string.min_len = 1];
int64 timestamp = 2 [(validate.rules).int64.gt = 0];
repeated PrometheusMetrics prometheus_metrics = 3;
Expand Down