Skip to content

Commit

Permalink
export: use google protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang89 committed Oct 20, 2023
1 parent b74d3e4 commit 998bd5a
Show file tree
Hide file tree
Showing 20 changed files with 95 additions and 13,121 deletions.
8 changes: 1 addition & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@ build-linux:
GOOS=linux go build $(DEV_BUILD_FLAGS) $(BUILD_FLAGS) -o bin/tcpmon main.go

.PHONY: release
release: proto
release: gproto
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build $(BUILD_FLAGS) -o bin/x86_64/tcpmon github.com/zperf/tcpmon
GOOS=linux GOARCH=arm64 CGO_ENABLED=0 go build $(BUILD_FLAGS) -o bin/aarch64/tcpmon github.com/zperf/tcpmon

.PHONY: proto
proto:
mkdir -p tcpmon/tproto
protoc -Iproto --gogofaster_out=tcpmon/tproto tcpmon.proto
sed -i 's/package tcpmon/package tproto/g' tcpmon/tproto/tcpmon.pb.go

.PHONY: gproto
gproto:
mkdir -p tcpmon/gproto
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ require (
github.com/rs/zerolog v1.31.0
github.com/samber/lo v1.38.1
github.com/sasha-s/go-deadlock v0.3.1
github.com/schollz/progressbar/v3 v3.13.1
github.com/spf13/afero v1.10.0
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.17.0
github.com/stretchr/testify v1.8.4
github.com/umisama/go-regexpcache v0.0.0-20150417035358-2444a542492f
google.golang.org/protobuf v1.31.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -72,7 +74,6 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/schollz/progressbar/v3 v3.13.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
Expand All @@ -90,6 +91,5 @@ require (
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
10 changes: 5 additions & 5 deletions tcpmon/collector/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"

"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/parsing"
"github.com/zperf/tcpmon/tcpmon/tproto"
)

const procNet = "/proc/net/"
Expand All @@ -25,15 +25,15 @@ func (m *NetstatCollector) Collect(now time.Time) ([]byte, error) {
return nil, err
}

buf, err := proto.Marshal(&tproto.Metric{Body: &tproto.Metric_Net{Net: r}})
buf, err := proto.Marshal(&gproto.Metric{Body: &gproto.Metric_Net{Net: r}})
if err != nil {
return nil, errors.WithStack(err)
}
return buf, nil
}

func (m *NetstatCollector) doCollect(now time.Time) (*tproto.NetstatMetric, error) {
var metric tproto.NetstatMetric
func (m *NetstatCollector) doCollect(now time.Time) (*gproto.NetstatMetric, error) {
var metric gproto.NetstatMetric
metric.Timestamp = now.Unix()

err := CollectProc("snmp", &metric)
Expand All @@ -49,7 +49,7 @@ func (m *NetstatCollector) doCollect(now time.Time) (*tproto.NetstatMetric, erro
return &metric, nil
}

func CollectProc(t string, metric *tproto.NetstatMetric) error {
func CollectProc(t string, metric *gproto.NetstatMetric) error {
path := procNet + t

fd, err := os.Open(path)
Expand Down
10 changes: 5 additions & 5 deletions tcpmon/collector/nic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/go-cmd/cmd"
"github.com/gogo/protobuf/proto"

"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/parsing"
"github.com/zperf/tcpmon/tcpmon/tproto"
)

type NicCollector struct{ config *Config }
Expand All @@ -24,7 +24,7 @@ func (m *NicCollector) Collect(now time.Time) ([]byte, error) {
return nil, err
}

metric := &tproto.Metric{Body: &tproto.Metric_Nic{Nic: r}}
metric := &gproto.Metric{Body: &gproto.Metric_Nic{Nic: r}}
val, err := proto.Marshal(metric)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -33,7 +33,7 @@ func (m *NicCollector) Collect(now time.Time) ([]byte, error) {
return val, nil
}

func (m *NicCollector) doCollect(now time.Time) (*tproto.NicMetric, error) {
func (m *NicCollector) doCollect(now time.Time) (*gproto.NicMetric, error) {
c := cmd.NewCmd(m.config.PathIfconfig)
ctx, cancel := context.WithTimeout(context.Background(), m.config.Timeout)
defer cancel()
Expand All @@ -42,8 +42,8 @@ func (m *NicCollector) doCollect(now time.Time) (*tproto.NicMetric, error) {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "ifconfig timeout")
case st := <-c.Start():
var nics tproto.NicMetric
nics.Type = tproto.MetricType_NIC
var nics gproto.NicMetric
nics.Type = gproto.MetricType_NIC
nics.Timestamp = now.Unix()

parsing.ParseIfconfigOutput(&nics, st.Stdout)
Expand Down
10 changes: 5 additions & 5 deletions tcpmon/collector/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/go-cmd/cmd"
"github.com/gogo/protobuf/proto"

"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/parsing"
"github.com/zperf/tcpmon/tcpmon/tproto"
)

// SocketCollector collect sockets statistics
Expand All @@ -27,7 +27,7 @@ func (m *SocketCollector) Collect(now time.Time) ([]byte, error) {
return nil, err
}

metric := &tproto.Metric{Body: &tproto.Metric_Tcp{Tcp: r}}
metric := &gproto.Metric{Body: &gproto.Metric_Tcp{Tcp: r}}
val, err := proto.Marshal(metric)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -36,7 +36,7 @@ func (m *SocketCollector) Collect(now time.Time) ([]byte, error) {
return val, nil
}

func (m *SocketCollector) doCollect(now time.Time) (*tproto.TcpMetric, error) {
func (m *SocketCollector) doCollect(now time.Time) (*gproto.TcpMetric, error) {
c := cmd.NewCmd(m.config.PathSS, m.config.ArgSS)
ctx, cancel := context.WithTimeout(context.Background(), m.config.Timeout)
defer cancel()
Expand All @@ -46,9 +46,9 @@ func (m *SocketCollector) doCollect(now time.Time) (*tproto.TcpMetric, error) {
return nil, errors.Wrap(ctx.Err(), "ss timeout")

case st := <-c.Start():
var t tproto.TcpMetric
var t gproto.TcpMetric
t.Timestamp = now.Unix()
t.Type = tproto.MetricType_TCP
t.Type = gproto.MetricType_TCP

parsing.ParseSS(&t, st.Stdout)
return &t, nil
Expand Down
23 changes: 13 additions & 10 deletions tcpmon/export/influxdb/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
"github.com/spf13/afero"
proto2 "google.golang.org/protobuf/proto"

"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/storage"
"github.com/zperf/tcpmon/tcpmon/tproto"
)

var ErrTimePointNotIncluded = errors.New("time point not included in this data file")
Expand Down Expand Up @@ -77,22 +77,22 @@ func (r *FastExporter) ReadRange(ra Range) ([]byte, error) {
return r.ReadAt(ra.Offset, ra.Len)
}

func (r *FastExporter) UnmarshalMetric(buf []byte) (*tproto.Metric, error) {
var m tproto.Metric
err := proto.Unmarshal(buf, &m)
func (r *FastExporter) UnmarshalMetric(buf []byte) (*gproto.Metric, error) {
var m gproto.Metric
err := proto2.Unmarshal(buf, &m)
if err != nil {
return nil, errors.Wrap(err, "parse failed")
}
return &m, nil
}

func getTimestamp(m *tproto.Metric) (time.Time, error) {
func getTimestamp(m *gproto.Metric) (time.Time, error) {
switch m := m.Body.(type) {
case *tproto.Metric_Tcp:
case *gproto.Metric_Tcp:
return time.Unix(m.Tcp.GetTimestamp(), 0), nil
case *tproto.Metric_Nic:
case *gproto.Metric_Nic:
return time.Unix(m.Nic.GetTimestamp(), 0), nil
case *tproto.Metric_Net:
case *gproto.Metric_Net:
return time.Unix(m.Net.GetTimestamp(), 0), nil
default:
return time.Time{}, errors.New("unknown metric type")
Expand Down Expand Up @@ -185,7 +185,10 @@ func (r *FastExporter) Export(w io.Writer, option *ExportOptions) error {

for _, rr := range ra {
if option.Bar != nil {
option.Bar.Add(1)
err = option.Bar.Add(1)
if err != nil {
log.Warn().Err(err).Msg("Add progress bar failed")
}
}
jobs <- rr
}
Expand Down
4 changes: 2 additions & 2 deletions tcpmon/export/influxdb/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/cockroachdb/errors"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"

"github.com/zperf/tcpmon/tcpmon/tproto"
"github.com/zperf/tcpmon/tcpmon/gproto"
)

type Importer struct {
Expand Down Expand Up @@ -37,7 +37,7 @@ func (im *Importer) Close() {
}
}

func (im *Importer) Submit(metric *tproto.Metric) error {
func (im *Importer) Submit(metric *gproto.Metric) error {
writeAPI := im.client.WriteAPI(im.option.Org, im.option.Bucket)
errCh := writeAPI.Errors()
conv := NewMetricConv(im.option.Hostname)
Expand Down
18 changes: 9 additions & 9 deletions tcpmon/export/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/samber/lo"

"github.com/zperf/tcpmon/tcpmon/tproto"
"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/tutils"
)

Expand All @@ -24,13 +24,13 @@ func New(hostname string, writer io.Writer) *LineProtocolExporter {
}
}

func (e *LineProtocolExporter) ExportMetric(m *tproto.Metric) {
func (e *LineProtocolExporter) ExportMetric(m *gproto.Metric) {
switch m := m.Body.(type) {
case *tproto.Metric_Tcp:
case *gproto.Metric_Tcp:
e.exportMetricTcp(m.Tcp)
case *tproto.Metric_Net:
case *gproto.Metric_Net:
e.exportMetricNet(m.Net)
case *tproto.Metric_Nic:
case *gproto.Metric_Nic:
e.exportMetricNic(m.Nic)
default:
log.Fatal().Msg("Unknown metric type")
Expand All @@ -46,10 +46,10 @@ func (e *LineProtocolExporter) Printf(format string, a ...any) {
tutils.FatalIf(err)
}

func (e *LineProtocolExporter) exportMetricTcp(m *tproto.TcpMetric) {
func (e *LineProtocolExporter) exportMetricTcp(m *gproto.TcpMetric) {
ts := m.GetTimestamp()
for _, s := range m.GetSockets() {
processText := strings.Join(lo.Map(s.GetProcesses(), func(p *tproto.ProcessInfo, _ int) string {
processText := strings.Join(lo.Map(s.GetProcesses(), func(p *gproto.ProcessInfo, _ int) string {
return fmt.Sprintf("cmd:%s;pid:%v;fd:%v", p.GetName(), p.GetPid(), p.GetFd())
}), ";")
var prefix string
Expand Down Expand Up @@ -123,7 +123,7 @@ func (e *LineProtocolExporter) exportMetricTcp(m *tproto.TcpMetric) {
}
}

func (e *LineProtocolExporter) exportMetricNic(m *tproto.NicMetric) {
func (e *LineProtocolExporter) exportMetricNic(m *gproto.NicMetric) {
ts := m.GetTimestamp()

for _, i := range m.GetIfaces() {
Expand All @@ -144,7 +144,7 @@ func (e *LineProtocolExporter) exportMetricNic(m *tproto.NicMetric) {
}
}

func (e *LineProtocolExporter) exportMetricNet(m *tproto.NetstatMetric) {
func (e *LineProtocolExporter) exportMetricNet(m *gproto.NetstatMetric) {
ts := m.GetTimestamp()
prefix := fmt.Sprintf("net,Hostname=%v", e.hostname)

Expand Down
18 changes: 9 additions & 9 deletions tcpmon/export/influxdb/metric_conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/samber/lo"

"github.com/zperf/tcpmon/tcpmon/tproto"
"github.com/zperf/tcpmon/tcpmon/gproto"
)

type MetricConv struct {
Expand All @@ -20,21 +20,21 @@ func NewMetricConv(hostname string) *MetricConv {
return &MetricConv{Hostname: hostname}
}

func (c *MetricConv) Metric(metric *tproto.Metric) (int64, []*write.Point) {
func (c *MetricConv) Metric(metric *gproto.Metric) (int64, []*write.Point) {
switch m := metric.Body.(type) {
case *tproto.Metric_Tcp:
case *gproto.Metric_Tcp:
return m.Tcp.Timestamp, c.Tcp(m.Tcp)
case *tproto.Metric_Net:
case *gproto.Metric_Net:
return m.Net.Timestamp, c.Net(m.Net)
case *tproto.Metric_Nic:
case *gproto.Metric_Nic:
return m.Nic.Timestamp, c.Nic(m.Nic)
default:
log.Fatal().Msg("Unknown metric type")
}
return 0, nil
}

func (c *MetricConv) Nic(metric *tproto.NicMetric) []*write.Point {
func (c *MetricConv) Nic(metric *gproto.NicMetric) []*write.Point {
ts := time.Unix(metric.GetTimestamp(), 0)
points := make([]*write.Point, 0)

Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *MetricConv) Nic(metric *tproto.NicMetric) []*write.Point {
return points
}

func (c *MetricConv) Tcp(metric *tproto.TcpMetric) []*write.Point {
func (c *MetricConv) Tcp(metric *gproto.TcpMetric) []*write.Point {
ts := time.Unix(metric.GetTimestamp(), 0)
points := make([]*write.Point, 0)

Expand All @@ -102,7 +102,7 @@ func (c *MetricConv) Tcp(metric *tproto.TcpMetric) []*write.Point {
"Hostname": c.Hostname,
}

processText := strings.Join(lo.Map(s.GetProcesses(), func(p *tproto.ProcessInfo, _ int) string {
processText := strings.Join(lo.Map(s.GetProcesses(), func(p *gproto.ProcessInfo, _ int) string {
return fmt.Sprintf("cmd:%s;pid:%v;fd:%v", p.GetName(), p.GetPid(), p.GetFd())
}), ";")
if processText != "" {
Expand Down Expand Up @@ -346,7 +346,7 @@ func (c *MetricConv) Tcp(metric *tproto.TcpMetric) []*write.Point {
return points
}

func (c *MetricConv) Net(metric *tproto.NetstatMetric) []*write.Point {
func (c *MetricConv) Net(metric *gproto.NetstatMetric) []*write.Point {
ts := time.Unix(metric.GetTimestamp(), 0)
points := make([]*write.Point, 0)

Expand Down
6 changes: 2 additions & 4 deletions tcpmon/export/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package export

import (
"github.com/zperf/tcpmon/tcpmon/tproto"
)
import "github.com/zperf/tcpmon/tcpmon/gproto"

type Exporter interface {
ExportMetric(metric *tproto.Metric)
ExportMetric(metric *gproto.Metric)
}
8 changes: 4 additions & 4 deletions tcpmon/parsing/ifconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package parsing
import (
"strings"

"github.com/zperf/tcpmon/tcpmon/tproto"
"github.com/zperf/tcpmon/tcpmon/gproto"
"github.com/zperf/tcpmon/tcpmon/tutils"
)

func ParseIfconfigOutput(nics *tproto.NicMetric, out []string) {
r := &tproto.IfaceMetric{}
func ParseIfconfigOutput(nics *gproto.NicMetric, out []string) {
r := &gproto.IfaceMetric{}
for _, line := range out {
if strings.Contains(line, ": flags=") {
fields := strings.FieldsFunc(line, func(c rune) bool {
return c == ':'
})
r = &tproto.IfaceMetric{}
r = &gproto.IfaceMetric{}
r.Name = fields[0]
} else if strings.Contains(line, "RX errors ") {
fields := strings.Fields(line)
Expand Down
Loading

0 comments on commit 998bd5a

Please sign in to comment.