From 95f213c39373249f37e98422e59232bdd6ae0f08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sat, 1 Feb 2020 23:41:09 -0700 Subject: [PATCH] Add RTT data to archives when available --- logscore/logscore.go | 1 + source/source.go | 29 ++++++++++++++++++++++++----- storage/bigquery/bigquery.go | 8 +------- storage/clickhouse/clickhouse.go | 9 +++++---- storage/fileavro/fileavro.go | 9 +++++++++ storage/fileavro/fileavro_test.go | 12 +++++++++++- 6 files changed, 51 insertions(+), 17 deletions(-) diff --git a/logscore/logscore.go b/logscore/logscore.go index ea630ac..af58d40 100644 --- a/logscore/logscore.go +++ b/logscore/logscore.go @@ -15,6 +15,7 @@ type LogScore struct { Score float64 `json:"sc" msgpack:"sc"` Step float64 `json:"st" msgpack:"st"` Offset *float64 `json:"of" msgpack:"of"` + RTT *int64 `json:"rtt" msgpack:"rtt"` Meta LogScoreMetadata `json:"attributes,omitempty"` } diff --git a/source/source.go b/source/source.go index d13d850..54ea00c 100644 --- a/source/source.go +++ b/source/source.go @@ -34,11 +34,17 @@ func (source *Source) Process(s storage.ArchiveStatus) error { lastID := int64(0) - hasAttributes, err := source.checkAttributes() + hasAttributes, err := source.checkField("attributes") if err != nil { return err } - log.Printf("%s has attributes: %t", source.Table, hasAttributes) + + hasRTT, err := source.checkField("rtt") + if err != nil { + return err + } + + log.Printf("%s has rtt: %t", source.Table, hasRTT) log.Printf("ModifiedOn: %s", s.ModifiedOn) @@ -91,12 +97,15 @@ func (source *Source) Process(s storage.ArchiveStatus) error { if hasAttributes { fields = fields + ",attributes" } + if hasRTT { + fields = fields + ",rtt" + } rows, err := db.DB.Query( fmt.Sprintf( `select %s from %s - where + where id > ? and ts != "0000-00-00 00:00:00" order by id @@ -117,6 +126,7 @@ func (source *Source) Process(s storage.ArchiveStatus) error { var monitorID sql.NullInt64 var offset sql.NullFloat64 + var rtt sql.NullInt64 var attributes sql.RawBytes ls := logscore.LogScore{} @@ -127,6 +137,9 @@ func (source *Source) Process(s storage.ArchiveStatus) error { if hasAttributes { fields = append(fields, &attributes) } + if hasRTT { + fields = append(fields, &rtt) + } err := rows.Scan(fields...) if err != nil { @@ -142,6 +155,12 @@ func (source *Source) Process(s storage.ArchiveStatus) error { ls.Offset = nil } + if rtt.Valid { + ls.RTT = &rtt.Int64 + } else { + ls.RTT = nil + } + if len(attributes) > 0 { err = json.Unmarshal(attributes, &ls.Meta) if err != nil { @@ -191,7 +210,7 @@ func (source *Source) Cleanup(status storage.ArchiveStatus) error { return c.Run(source, status) } -func (source *Source) checkAttributes() (bool, error) { +func (source *Source) checkField(field string) (bool, error) { type TableStruct struct { Field string `db:"Field"` @@ -210,7 +229,7 @@ func (source *Source) checkAttributes() (bool, error) { } for _, c := range columns { - if c.Field == "attributes" { + if c.Field == field { return true, nil } } diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 46543f4..808bc91 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -106,13 +106,7 @@ func (a *bqArchiver) Load(fh io.ReadWriteCloser) error { return fmt.Errorf("could not run job: %s", err) } - // r.Close() - - // if err := r.Close(); err != nil { - // return err - // } - - log.Printf("Loading BigQuery data with job %d", job.ID) + log.Printf("Loading BigQuery data with job %q", job.ID()) status, err := job.Wait(ctx) if err != nil { return fmt.Errorf("error checking job status: %s", err) diff --git a/storage/clickhouse/clickhouse.go b/storage/clickhouse/clickhouse.go index 8c7feff..13d9285 100644 --- a/storage/clickhouse/clickhouse.go +++ b/storage/clickhouse/clickhouse.go @@ -47,12 +47,12 @@ func NewArchiver() (storage.Archiver, error) { score Float32, step Float32, offset Nullable(Float64), + rtt Nullable(UInt32), leap Nullable(UInt8), error Nullable(String) ) engine=MergeTree PARTITION BY dt ORDER BY (server_id, ts) - `) if err != nil { return nil, err @@ -81,8 +81,8 @@ func (a *CHArchiver) Store(logscores []*logscore.LogScore) (int, error) { } stmt, err := tx.Prepare(` INSERT INTO log_scores - (dt, id, server_id, monitor_id, ts, score, step, offset, leap, error) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + (dt, id, server_id, monitor_id, ts, score, step, offset, rtt, leap, error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) if err != nil { return 0, err } @@ -106,7 +106,8 @@ func (a *CHArchiver) Store(logscores []*logscore.LogScore) (int, error) { l.Ts, // clickhouse figures out the right data in UTC from this l.ID, l.ServerID, l.MonitorID, l.Ts, - l.Score, l.Step, l.Offset, + l.Score, l.Step, + l.Offset, l.RTT, leap, lsError, ) if err != nil { diff --git a/storage/fileavro/fileavro.go b/storage/fileavro/fileavro.go index d3004d7..1512ffb 100644 --- a/storage/fileavro/fileavro.go +++ b/storage/fileavro/fileavro.go @@ -95,6 +95,7 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc {"name": "score", "type": "float"}, {"name": "step", "type": "float"}, {"name": "offset", "type": ["null", "float"]}, + {"name": "rtt", "type": ["null", "int"]}, {"name": "leap", "type": ["null", "int"]}, {"name": "error", "type": ["null", "string"]} ] @@ -135,6 +136,7 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc // } var offset interface{} + var rtt interface{} if ls.Offset == nil { offset = nil @@ -142,6 +144,12 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc offset = goavro.Union("float", *ls.Offset) } + if ls.RTT == nil { + rtt = nil + } else { + rtt = goavro.Union("int", *ls.RTT) + } + var leap interface{} if ls.Meta.Leap != 0 { leap = goavro.Union("int", int(ls.Meta.Leap)) @@ -160,6 +168,7 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc "score": ls.Score, "step": ls.Step, "offset": offset, + "rtt": rtt, "leap": leap, "error": lsError, } diff --git a/storage/fileavro/fileavro_test.go b/storage/fileavro/fileavro_test.go index 989c31e..8ebae03 100644 --- a/storage/fileavro/fileavro_test.go +++ b/storage/fileavro/fileavro_test.go @@ -1,6 +1,8 @@ package fileavro import ( + "io/ioutil" + "os" "testing" "go.ntppool.org/archiver/logscore" @@ -16,7 +18,12 @@ func TestStore(t *testing.T) { // t.Logf("tempdir: %s", tempdir) // defer os.RemoveAll(tempdir) - tempdir := "/tmp/avro" + tempdir, err := ioutil.TempDir("", "fileavro") + if err != nil || len(tempdir) == 0 { + t.Fatalf("could not create temporary directory: %s", err) + } + + defer os.RemoveAll(tempdir) av, err := NewArchiver(tempdir) if err != nil { @@ -24,6 +31,8 @@ func TestStore(t *testing.T) { t.Fail() } + rtt := int64(11234) + ls := []*logscore.LogScore{ &logscore.LogScore{ ID: 103535350, @@ -33,6 +42,7 @@ func TestStore(t *testing.T) { Score: 19.2, Step: 0.9, Offset: nil, + RTT: &rtt, // &float64{0.212313413}, Meta: logscore.LogScoreMetadata{Leap: 0}, },