Skip to content

Commit

Permalink
Merge cd50172 into d49bbe3
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-soltesz committed Jun 10, 2022
2 parents d49bbe3 + cd50172 commit fa4740f
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 160 deletions.
4 changes: 0 additions & 4 deletions cmd/etl_worker/etl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/m-lab/etl/task"
"github.com/m-lab/etl/worker"

"github.com/m-lab/annotation-service/site"

// Enable profiling. For more background and usage information, see:
// https://blog.golang.org/profiling-go-programs
_ "net/http/pprof"
Expand Down Expand Up @@ -276,8 +274,6 @@ func main() {
log.Println("To resolve oauth problems, run 'gcloud auth application-default login'")
}

go site.MustReload(mainCtx)

// Enable block profiling
runtime.SetBlockProfileRate(1000000) // One event per msec.

Expand Down
24 changes: 0 additions & 24 deletions factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ package factory
import (
"context"

v2 "github.com/m-lab/annotation-service/api/v2"

"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/parser"
"github.com/m-lab/etl/row"
)

Expand Down Expand Up @@ -35,11 +32,6 @@ func NewError(dt, detail string, code int, err error) etl.ProcessingError {
return processingError{dt, detail, code, err}
}

// AnnotatorFactory provides Get() which always returns a new or existing Annotator.
type AnnotatorFactory interface {
Get(context.Context, etl.DataPath) (v2.Annotator, etl.ProcessingError)
}

// SinkFactory provides Get() which may return a new or existing Sink.
// If existing Sink, the Commit method must support concurrent calls.
// Existing Sink may or may not respect the context.
Expand All @@ -51,19 +43,3 @@ type SinkFactory interface {
type SourceFactory interface {
Get(context.Context, etl.DataPath) (etl.TestSource, etl.ProcessingError)
}

//=======================================================================
// Implementations
//=======================================================================

type defaultAnnotatorFactory struct{}

// Get implements AnnotatorFactory.Get
func (ann *defaultAnnotatorFactory) Get(ctx context.Context, dp etl.DataPath) (v2.Annotator, etl.ProcessingError) {
return &parser.NullAnnotator{}, nil
}

// DefaultAnnotatorFactory returns the annotation service annotator.
func DefaultAnnotatorFactory() AnnotatorFactory {
return &defaultAnnotatorFactory{}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/iancoleman/strcase v0.2.0
github.com/kr/pretty v0.2.1
github.com/m-lab/annotation-service v0.0.0-20210713124633-fa227b3d5b2f
github.com/m-lab/annotation-service v0.0.0-20210713124633-fa227b3d5b2f // indirect
github.com/m-lab/etl-gardener v0.0.0-20210910143655-d4bda5bfc75d
github.com/m-lab/go v0.1.47
github.com/m-lab/ndt-server v0.20.9
Expand Down
12 changes: 0 additions & 12 deletions parser/annotation.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package parser

import (
"context"
"encoding/json"
"log"
"strings"
Expand All @@ -10,8 +9,6 @@ import (
"cloud.google.com/go/bigquery"

"cloud.google.com/go/civil"
"github.com/m-lab/annotation-service/api"
v2as "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
Expand All @@ -30,15 +27,6 @@ type AnnotationParser struct {
suffix string
}

// NullAnnotator mimicks the annotation-service API, and always returns an empty
// result without any network connections.
type NullAnnotator struct{}

// GetAnnotations always returns an empty annotation result.
func (ann *NullAnnotator) GetAnnotations(ctx context.Context, date time.Time, ips []string, info ...string) (*v2as.Response, error) {
return &v2as.Response{AnnotatorDate: time.Now(), Annotations: make(map[string]*api.Annotations, 0)}, nil
}

// NewAnnotationParser creates a new parser for annotation data.
func NewAnnotationParser(sink row.Sink, label, suffix string) etl.Parser {
bufSize := etl.ANNOTATION.BQBufferSize()
Expand Down
18 changes: 0 additions & 18 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
package parser_test

import (
"context"
"fmt"
"log"
"os"
"testing"
"time"

"cloud.google.com/go/bigquery"
"github.com/m-lab/annotation-service/api"
v2 "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/parser"
Expand Down Expand Up @@ -51,20 +47,6 @@ func (ti *countingInserter) Flush() error {
return nil
}

// newFakeAnnotator creates a new annotator that injects the given annotation
// responses for unit testing.
func newFakeAnnotator(ann map[string]*api.Annotations) *fakeAnnotator {
return &fakeAnnotator{ann: ann}
}

type fakeAnnotator struct {
ann map[string]*api.Annotations
}

func (ann *fakeAnnotator) GetAnnotations(ctx context.Context, date time.Time, ips []string, info ...string) (*v2.Response, error) {
return &v2.Response{AnnotatorDate: time.Now(), Annotations: ann.ann}, nil
}

func TestNormalizeIP(t *testing.T) {
tests := []struct {
name string
Expand Down
34 changes: 0 additions & 34 deletions parser/tcpinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"cloud.google.com/go/civil"
"github.com/m-lab/annotation-service/api"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/parser"
"github.com/m-lab/etl/schema"
Expand Down Expand Up @@ -55,39 +54,6 @@ func fileSource(fn string) (etl.TestSource, error) {
RetryBaseTime: timeout, TableBase: "test", PathDate: civil.Date{Year: 2020, Month: 6, Day: 11}}, nil
}

var tcpInfoAnno = map[string]*api.Annotations{
// client ip.
"35.225.75.192": &api.Annotations{
Geo: &api.GeolocationIP{
ContinentCode: "NA",
CountryCode: "US",
Latitude: 1.0,
Longitude: 2.0,
},
Network: &api.ASData{
ASNumber: 1234,
Systems: []api.System{
{ASNs: []uint32{1234}},
},
},
},
// server ip.
"195.89.146.242": &api.Annotations{
Geo: &api.GeolocationIP{
ContinentCode: "NA",
CountryCode: "US",
Latitude: 1.0,
Longitude: 2.0,
},
Network: &api.ASData{
ASNumber: 1234,
Systems: []api.System{
{ASNs: []uint32{1234}},
},
},
},
}

type inMemorySink struct {
data []interface{}
committed int
Expand Down
26 changes: 6 additions & 20 deletions row/row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import (
"time"

"github.com/m-lab/etl/row"

"github.com/m-lab/annotation-service/api"
)

// Implement parser.Annotatable

type Row struct {
client string
server string
clientAnn *api.Annotations
serverAnn *api.Annotations
client string
server string
}

type BadRow struct{}
Expand All @@ -29,16 +25,6 @@ func (row *Row) GetServerIP() string {
return row.server
}

func (row *Row) AnnotateClients(remote map[string]*api.Annotations) error {
row.clientAnn = remote[row.GetClientIPs()[0]]
return nil
}

func (row *Row) AnnotateServer(local *api.GeoData) error {
row.serverAnn = local
return nil
}

func (row *Row) GetLogTime() time.Time {
return time.Now()
}
Expand Down Expand Up @@ -71,10 +57,10 @@ func TestBase(t *testing.T) {

b := row.NewBase("test", ins, 10)

b.Put(&Row{"1.2.3.4", "4.3.2.1", nil, nil})
b.Put(&Row{"1.2.3.4", "4.3.2.1"})

// Add a row with empty server IP
b.Put(&Row{"1.2.3.4", "", nil, nil})
b.Put(&Row{"1.2.3.4", ""})
b.Flush()
stats := b.GetStats()
if stats.Committed != 2 {
Expand All @@ -91,14 +77,14 @@ func TestAsyncPut(t *testing.T) {

b := row.NewBase("test", ins, 1)

b.Put(&Row{"1.2.3.4", "4.3.2.1", nil, nil})
b.Put(&Row{"1.2.3.4", "4.3.2.1"})

if b.GetStats().Committed != 0 {
t.Fatalf("Expected %d, Got %d.", 0, b.GetStats().Committed)
}

// This should trigger an async flush
b.Put(&Row{"1.2.3.4", "4.3.2.1", nil, nil})
b.Put(&Row{"1.2.3.4", "4.3.2.1"})
start := time.Now()
for time.Since(start) < 5*time.Second && b.GetStats().Committed < 1 {
time.Sleep(10 * time.Millisecond)
Expand Down
39 changes: 19 additions & 20 deletions schema/ndt_web100.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"github.com/m-lab/annotation-service/api"
"github.com/m-lab/go/cloud/bqx"
"github.com/m-lab/uuid-annotator/annotator"
)
Expand Down Expand Up @@ -35,25 +34,25 @@ type ndtAnomalies struct {
}

type ndtConnectionSpec struct {
ClientAF int64 `bigquery:"client_af"`
ClientApplication string `bigquery:"client_application"`
ClientBrowser string `bigquery:"client_browser"`
ClientHostname string `bigquery:"client_hostname"`
ClientIP string `bigquery:"client_ip"`
ClientKernelVersion string `bigquery:"client_kernel_version"`
ClientOS string `bigquery:"client_os"`
ClientVersion string `bigquery:"client_version"`
DataDirection int64 `bigquery:"data_direction"`
ServerAF int64 `bigquery:"server_af"`
ServerHostname string `bigquery:"server_hostname"`
ServerIP string `bigquery:"server_ip"`
ServerKernelVersion string `bigquery:"server_kernel_version"`
TLS bool `bigquery:"tls"`
Websockets bool `bigquery:"websockets"`
ClientGeolocation api.GeolocationIP `bigquery:"client_geolocation"`
ServerGeolocation api.GeolocationIP `bigquery:"server_geolocation"`
Client ndtClientNetwork `bigquery:"client"`
Server ndtServerNetwork `bigquery:"server"`
ClientAF int64 `bigquery:"client_af"`
ClientApplication string `bigquery:"client_application"`
ClientBrowser string `bigquery:"client_browser"`
ClientHostname string `bigquery:"client_hostname"`
ClientIP string `bigquery:"client_ip"`
ClientKernelVersion string `bigquery:"client_kernel_version"`
ClientOS string `bigquery:"client_os"`
ClientVersion string `bigquery:"client_version"`
DataDirection int64 `bigquery:"data_direction"`
ServerAF int64 `bigquery:"server_af"`
ServerHostname string `bigquery:"server_hostname"`
ServerIP string `bigquery:"server_ip"`
ServerKernelVersion string `bigquery:"server_kernel_version"`
TLS bool `bigquery:"tls"`
Websockets bool `bigquery:"websockets"`
ClientGeolocation LegacyGeolocationIP `bigquery:"client_geolocation"`
ServerGeolocation LegacyGeolocationIP `bigquery:"server_geolocation"`
Client ndtClientNetwork `bigquery:"client"`
Server ndtServerNetwork `bigquery:"server"`

// ServerX and ClientX are for the synthetic UUID annotator export process.
ServerX annotator.ServerAnnotations
Expand Down
57 changes: 52 additions & 5 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"reflect"
"time"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/go/cloud/bqx"
)

Expand All @@ -28,17 +27,17 @@ type ServerInfo struct {
Port uint16
IATA string

Geo *api.GeolocationIP
Network *api.ASData // NOTE: dominant ASN is available at top level.
Geo *LegacyGeolocationIP
Network *LegacyASData // NOTE: dominant ASN is available at top level.
}

// ClientInfo details various kinds of information about the client.
type ClientInfo struct {
IP string
Port uint16

Geo *api.GeolocationIP
Network *api.ASData // NOTE: dominant ASN is available at top level.
Geo *LegacyGeolocationIP
Network *LegacyASData // NOTE: dominant ASN is available at top level.
}

// ParseInfoV0 provides details about the parsing of this row.
Expand All @@ -49,6 +48,54 @@ type ParseInfoV0 struct {
Filename string
}

/*************************************************************************
* DEPRECATED: Annotation Structs *
*************************************************************************/

// LegacyGeolocationIP preserves the schema for existing v1 datatype schemas. It should not be used for new datatypes.
// Deprecated: v1 annotation-service schema, preserved for backward compatibility. Do not reuse.
type LegacyGeolocationIP struct {
ContinentCode string `json:"continent_code,,omitempty" bigquery:"continent_code"` // Gives a shorthand for the continent
CountryCode string `json:"country_code,,omitempty" bigquery:"country_code"` // Gives a shorthand for the country
CountryCode3 string `json:"country_code3,,omitempty" bigquery:"country_code3"` // Gives a shorthand for the country
CountryName string `json:"country_name,,omitempty" bigquery:"country_name"` // Name of the country
Region string `json:"region,,omitempty" bigquery:"region"` // Region or State within the country
Subdivision1ISOCode string `json:",omitempty"` // ISO3166-2 first-level country subdivision ISO code
Subdivision1Name string `json:",omitempty"` // ISO3166-2 first-level country subdivision name
Subdivision2ISOCode string `json:",omitempty"` // ISO3166-2 second-level country subdivision ISO code
Subdivision2Name string `json:",omitempty"` // ISO3166-2 second-level country subdivision name
MetroCode int64 `json:"metro_code,,omitempty" bigquery:"metro_code"` // Metro code within the country
City string `json:"city,,omitempty" bigquery:"city"` // City within the region
AreaCode int64 `json:"area_code,,omitempty" bigquery:"area_code"` // Area code, similar to metro code
PostalCode string `json:"postal_code,,omitempty" bigquery:"postal_code"` // Postal code, again similar to metro
Latitude float64 `json:"latitude,,omitempty" bigquery:"latitude"` // Latitude
Longitude float64 `json:"longitude,,omitempty" bigquery:"longitude"` // Longitude
AccuracyRadiusKm int64 `json:"radius,,omitempty" bigquery:"radius"` // Accuracy Radius (geolite2 from 2018)

Missing bool `json:",omitempty"` // True when the Geolocation data is missing from MaxMind.
}

type LegacySystem struct {
// ASNs contains a single ASN, or AS set. There must always be at least one ASN.
// If there are more than one ASN, they are (arbitrarily) listed in increasing numerical order.
ASNs []uint32
}

// LegacyASData preserves the schema for existing v1 datatype schemas. It should not be used for new datatypes.
// Deprecated: v1 annotation-service schema, preserved for backward compatibility. Do not reuse.
type LegacyASData struct {
IPPrefix string `json:",omitempty"` // the IP prefix found in the table.
CIDR string `json:",omitempty"` // The IP prefix found in the RouteViews data.
ASNumber uint32 `json:",omitempty"` // First AS number.
ASName string `json:",omitempty"` // AS name for that number, data from IPinfo.io
Missing bool `json:",omitempty"` // True when the ASN data is missing from RouteViews.

// One or more "Systems". There must always be at least one System. If there are more than one,
// then this is a Multi-Origin AS, and the component Systems are in order of frequency in routing tables,
// most common first.
Systems []LegacySystem `json:",omitempty"`
}

// FindSchemaDocsFor should be used by parser row types to associate bigquery
// field descriptions with a schema generated from a row type.
func FindSchemaDocsFor(value interface{}) []bqx.SchemaDoc {
Expand Down

0 comments on commit fa4740f

Please sign in to comment.