Skip to content

Commit

Permalink
Add RTT data to archives when available
Browse files Browse the repository at this point in the history
  • Loading branch information
abh committed Feb 2, 2020
1 parent ea86aed commit 95f213c
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 17 deletions.
1 change: 1 addition & 0 deletions logscore/logscore.go
Expand Up @@ -15,6 +15,7 @@ type LogScore struct {
Score float64 `json:"sc" msgpack:"sc"`
Step float64 `json:"st" msgpack:"st"`
Offset *float64 `json:"of" msgpack:"of"`
RTT *int64 `json:"rtt" msgpack:"rtt"`
Meta LogScoreMetadata `json:"attributes,omitempty"`
}

Expand Down
29 changes: 24 additions & 5 deletions source/source.go
Expand Up @@ -34,11 +34,17 @@ func (source *Source) Process(s storage.ArchiveStatus) error {

lastID := int64(0)

hasAttributes, err := source.checkAttributes()
hasAttributes, err := source.checkField("attributes")
if err != nil {
return err
}
log.Printf("%s has attributes: %t", source.Table, hasAttributes)

hasRTT, err := source.checkField("rtt")
if err != nil {
return err
}

log.Printf("%s has rtt: %t", source.Table, hasRTT)

log.Printf("ModifiedOn: %s", s.ModifiedOn)

Expand Down Expand Up @@ -91,12 +97,15 @@ func (source *Source) Process(s storage.ArchiveStatus) error {
if hasAttributes {
fields = fields + ",attributes"
}
if hasRTT {
fields = fields + ",rtt"
}

rows, err := db.DB.Query(
fmt.Sprintf(
`select %s
from %s
where
where
id > ?
and ts != "0000-00-00 00:00:00"
order by id
Expand All @@ -117,6 +126,7 @@ func (source *Source) Process(s storage.ArchiveStatus) error {

var monitorID sql.NullInt64
var offset sql.NullFloat64
var rtt sql.NullInt64
var attributes sql.RawBytes

ls := logscore.LogScore{}
Expand All @@ -127,6 +137,9 @@ func (source *Source) Process(s storage.ArchiveStatus) error {
if hasAttributes {
fields = append(fields, &attributes)
}
if hasRTT {
fields = append(fields, &rtt)
}

err := rows.Scan(fields...)
if err != nil {
Expand All @@ -142,6 +155,12 @@ func (source *Source) Process(s storage.ArchiveStatus) error {
ls.Offset = nil
}

if rtt.Valid {
ls.RTT = &rtt.Int64
} else {
ls.RTT = nil
}

if len(attributes) > 0 {
err = json.Unmarshal(attributes, &ls.Meta)
if err != nil {
Expand Down Expand Up @@ -191,7 +210,7 @@ func (source *Source) Cleanup(status storage.ArchiveStatus) error {
return c.Run(source, status)
}

func (source *Source) checkAttributes() (bool, error) {
func (source *Source) checkField(field string) (bool, error) {

type TableStruct struct {
Field string `db:"Field"`
Expand All @@ -210,7 +229,7 @@ func (source *Source) checkAttributes() (bool, error) {
}

for _, c := range columns {
if c.Field == "attributes" {
if c.Field == field {
return true, nil
}
}
Expand Down
8 changes: 1 addition & 7 deletions storage/bigquery/bigquery.go
Expand Up @@ -106,13 +106,7 @@ func (a *bqArchiver) Load(fh io.ReadWriteCloser) error {
return fmt.Errorf("could not run job: %s", err)
}

// r.Close()

// if err := r.Close(); err != nil {
// return err
// }

log.Printf("Loading BigQuery data with job %d", job.ID)
log.Printf("Loading BigQuery data with job %q", job.ID())
status, err := job.Wait(ctx)
if err != nil {
return fmt.Errorf("error checking job status: %s", err)
Expand Down
9 changes: 5 additions & 4 deletions storage/clickhouse/clickhouse.go
Expand Up @@ -47,12 +47,12 @@ func NewArchiver() (storage.Archiver, error) {
score Float32,
step Float32,
offset Nullable(Float64),
rtt Nullable(UInt32),
leap Nullable(UInt8),
error Nullable(String)
) engine=MergeTree
PARTITION BY dt
ORDER BY (server_id, ts)
`)
if err != nil {
return nil, err
Expand Down Expand Up @@ -81,8 +81,8 @@ func (a *CHArchiver) Store(logscores []*logscore.LogScore) (int, error) {
}
stmt, err := tx.Prepare(`
INSERT INTO log_scores
(dt, id, server_id, monitor_id, ts, score, step, offset, leap, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
(dt, id, server_id, monitor_id, ts, score, step, offset, rtt, leap, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return 0, err
}
Expand All @@ -106,7 +106,8 @@ func (a *CHArchiver) Store(logscores []*logscore.LogScore) (int, error) {
l.Ts, // clickhouse figures out the right data in UTC from this
l.ID, l.ServerID, l.MonitorID,
l.Ts,
l.Score, l.Step, l.Offset,
l.Score, l.Step,
l.Offset, l.RTT,
leap, lsError,
)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions storage/fileavro/fileavro.go
Expand Up @@ -95,6 +95,7 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc
{"name": "score", "type": "float"},
{"name": "step", "type": "float"},
{"name": "offset", "type": ["null", "float"]},
{"name": "rtt", "type": ["null", "int"]},
{"name": "leap", "type": ["null", "int"]},
{"name": "error", "type": ["null", "string"]}
]
Expand Down Expand Up @@ -135,13 +136,20 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc
// }

var offset interface{}
var rtt interface{}

if ls.Offset == nil {
offset = nil
} else {
offset = goavro.Union("float", *ls.Offset)
}

if ls.RTT == nil {
rtt = nil
} else {
rtt = goavro.Union("int", *ls.RTT)
}

var leap interface{}
if ls.Meta.Leap != 0 {
leap = goavro.Union("int", int(ls.Meta.Leap))
Expand All @@ -160,6 +168,7 @@ func (a *AvroArchiver) StoreWriter(fh io.ReadWriter, logscores []*logscore.LogSc
"score": ls.Score,
"step": ls.Step,
"offset": offset,
"rtt": rtt,
"leap": leap,
"error": lsError,
}
Expand Down
12 changes: 11 additions & 1 deletion storage/fileavro/fileavro_test.go
@@ -1,6 +1,8 @@
package fileavro

import (
"io/ioutil"
"os"
"testing"

"go.ntppool.org/archiver/logscore"
Expand All @@ -16,14 +18,21 @@ func TestStore(t *testing.T) {
// t.Logf("tempdir: %s", tempdir)
// defer os.RemoveAll(tempdir)

tempdir := "/tmp/avro"
tempdir, err := ioutil.TempDir("", "fileavro")
if err != nil || len(tempdir) == 0 {
t.Fatalf("could not create temporary directory: %s", err)
}

defer os.RemoveAll(tempdir)

av, err := NewArchiver(tempdir)
if err != nil {
t.Logf("could not NewArchiver(): %s", err)
t.Fail()
}

rtt := int64(11234)

ls := []*logscore.LogScore{
&logscore.LogScore{
ID: 103535350,
Expand All @@ -33,6 +42,7 @@ func TestStore(t *testing.T) {
Score: 19.2,
Step: 0.9,
Offset: nil,
RTT: &rtt,
// &float64{0.212313413},
Meta: logscore.LogScoreMetadata{Leap: 0},
},
Expand Down

0 comments on commit 95f213c

Please sign in to comment.