Skip to content

Commit

Permalink
Add processor implementation for HopAnnotation1 datatype (#26)
Browse files Browse the repository at this point in the history
* HopAnnotation1 processing and tests
* Add traceroute-caller as dependency
  • Loading branch information
stephen-soltesz committed Apr 21, 2023
1 parent 6187e53 commit 6e0d0c0
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ require (
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/m-lab/tcp-info v1.5.3 // indirect
github.com/m-lab/traceroute-caller v0.11.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/oschwald/geoip2-golang v1.7.0 // indirect
github.com/oschwald/maxminddb-golang v1.9.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/xattr v0.4.9 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ github.com/m-lab/go v0.1.60 h1:XdXpyfER1WZTXRMK0ZC3lo6W3zW2BAAZeKO9l+kjqHM=
github.com/m-lab/go v0.1.60/go.mod h1:O1D/EoVarJ8lZt9foANcqcKtwxHatBzUxXFFyC87aQQ=
github.com/m-lab/tcp-info v1.5.3 h1:4IspTPcNc8D8LNRvuFnID8gDiz+hxPAtYvpKZaiGGe8=
github.com/m-lab/tcp-info v1.5.3/go.mod h1:bkvI4qbjB6QVC2tsLSHqf5OnIYcmuLEVjo7+8YA56Kg=
github.com/m-lab/traceroute-caller v0.11.2 h1:Ti5Dr4/ZzzmSxHvvJD/9/GCsK8iXS5AsEv5eZrQueTE=
github.com/m-lab/traceroute-caller v0.11.2/go.mod h1:GNlXnJEwxR+ZBolxNuFRRqjHpc43o9P0qfNAwQcyXvc=
github.com/m-lab/uuid-annotator v0.5.1 h1:hfFWEcETqu90vNTimlVT7iKBKz5Q6uZDxPd/0CpH9l0=
github.com/m-lab/uuid-annotator v0.5.1/go.mod h1:ny5OEGTaeRVjgW7FqsfWiJMRmdq2q8O3uc8/0xmTJDU=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
Expand All @@ -233,6 +235,10 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oschwald/geoip2-golang v1.7.0 h1:JW1r5AKi+vv2ujSxjKthySK3jo8w8oKWPyXsw+Qs/S8=
github.com/oschwald/geoip2-golang v1.7.0/go.mod h1:mdI/C7iK7NVMcIDDtf4bCKMJ7r0o7UwGeCo9eiitCMQ=
github.com/oschwald/maxminddb-golang v1.9.0 h1:tIk4nv6VT9OiPyrnDAfJS1s1xKDQMZOsGojab6EjC1Y=
github.com/oschwald/maxminddb-golang v1.9.0/go.mod h1:TK+s/Z2oZq0rSl4PSeAEoP0bgm82Cp5HyvYbt8K3zLY=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
134 changes: 134 additions & 0 deletions internal/annotation/process_hops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package annotation

import (
"archive/tar"
"context"
"encoding/json"
"log"
"net"
"net/url"
"strings"
"time"

"github.com/m-lab/archive-repacker/archive"
"github.com/m-lab/archive-repacker/internal/process"
"github.com/m-lab/archive-repacker/routeview"
"github.com/m-lab/go/content"
"github.com/m-lab/go/rtx"
"github.com/m-lab/traceroute-caller/hopannotation"
"github.com/m-lab/uuid-annotator/asnannotator"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"cloud.google.com/go/storage"
)

var (
repackerHopFileUnparsable = promauto.NewCounter(
prometheus.CounterOpts{
Name: "repacker_hop_file_unparsable_total",
Help: "The number of hop annotation files that could not be parsed",
},
)
)

// Processor maintains state for reprocessing annotation archives.
type HopProcessor struct {
asn asnannotator.ASNAnnotator
rv4 *url.URL // IPv4 routeview prefix2as dataset.
rv6 *url.URL // IPv6 routeview prefix2as dataset.
names *url.URL // asname dataset.
src *archive.Source // source archive.
outBucket string // output GCS bucket.
client *storage.Client
}

// NewHopProcessor creates a new annotation processor.
func NewHopProcessor(client *storage.Client, outBucket string, rv4, rv6, asnames *url.URL) *HopProcessor {
return &HopProcessor{
rv4: rv4,
rv6: rv6,
names: asnames,
outBucket: outBucket,
client: client,
}
}

// Init downloads the routeview datasets for the given date and initializes the ASN annotator.
func (p *HopProcessor) Init(ctx context.Context, date string) {
// Download ipv4 routeview data for given date.
u, err := routeview.NewURLGenerator(p.client, p.rv4.String()).Next(ctx, date)
rtx.Must(err, "Could not generate routeview v4 URL")
p4 := &gcsProvider{Path: &archive.Path{URL: u}, Client: p.client}

// Download ipv6 routeview data for given date.
u, err = routeview.NewURLGenerator(p.client, p.rv6.String()).Next(ctx, date)
rtx.Must(err, "Could not generate routeview v6 URL")
p6 := &gcsProvider{Path: &archive.Path{URL: u}, Client: p.client}

// Load asnames.
asnames, err := content.FromURL(ctx, p.names)
rtx.Must(err, "Could not load AS names URL")

// Create asn annotator.
p.asn = asnannotator.New(ctx, p4, p6, asnames, []net.IP{})
}

// Source generates a new archive.Reader for the result row.ArchiveURL.
func (p *HopProcessor) Source(ctx context.Context, row Result) *archive.Source {
log.Println("Starting", row.ArchiveURL)
// Download GCS archive.
src, err := archive.NewGCSSource(ctx, p.client, row.ArchiveURL)
rtx.Must(err, "failed to create new source for %s", row.ArchiveURL)
p.src = src
return src
}

// File processes the given file header and file contents. File returns the new
// file content or process.ErrCorrupt.
func (p *HopProcessor) File(h *tar.Header, b []byte) ([]byte, error) {
// Parse annotation.
an := hopannotation.HopAnnotation1{}
err := json.Unmarshal(b, &an)
if err != nil {
log.Println("Error Unmarshal file:", h.Name, err)
repackerQueryFilesCorrupt.Inc()
// Since file is corrupt, do not add to output.
return nil, process.ErrCorrupt
}

fields := strings.Split(strings.ReplaceAll(h.Name, ".json", ""), "_")
if len(fields) != 3 {
// We cannot parse this filename to identify the IP.
log.Println("Skipping unparsable filename:", h.Name)
repackerHopFileUnparsable.Inc()
return b, nil
}

before := an.Annotations.Network
// Recreate Network annotation using client IP.
an.Annotations.Network = p.asn.AnnotateIP(fields[2])

// Track how frequently the annotation was previously missing or updated.
if before == nil || before.Missing {
repackerAnnotations.WithLabelValues("was-missed").Inc()
} else if before.ASNumber != an.Annotations.Network.ASNumber {
repackerAnnotations.WithLabelValues("asn-update").Inc()
} else {
repackerAnnotations.WithLabelValues("equal").Inc()
}

// Serialize annotation again.
b, err = json.Marshal(an)
rtx.Must(err, "failed to marshal new annotation")
return b, nil
}

// Finish completes processing of the given output archive by uploading to GCS
// to an alternate bucket and object name.
func (p *HopProcessor) Finish(ctx context.Context, out *archive.Target) error {
uctx, ucancel := context.WithTimeout(ctx, 10*time.Minute)
defer ucancel()
o := p.src.Path.Dup(p.outBucket)
return out.Upload(uctx, p.client, o)
}

0 comments on commit 6e0d0c0

Please sign in to comment.