Skip to content

Commit

Permalink
Upgrade otel (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
dokterbob committed Sep 27, 2022
1 parent 0a99d93 commit 0cf9f86
Show file tree
Hide file tree
Showing 30 changed files with 230 additions and 190 deletions.
2 changes: 1 addition & 1 deletion commands/add.go
Expand Up @@ -20,7 +20,7 @@ func AddHash(ctx context.Context, cfg *config.Config, hash string) error {
if err != nil {
return err
}
defer instFlusher()
defer instFlusher(ctx)

i := instr.New()

Expand Down
2 changes: 1 addition & 1 deletion commands/crawl.go
Expand Up @@ -16,7 +16,7 @@ func Crawl(ctx context.Context, cfg *config.Config) error {
if err != nil {
log.Fatal(err)
}
defer instFlusher()
defer instFlusher(ctx)

i := instr.New()

Expand Down
7 changes: 2 additions & 5 deletions components/crawler/crawldirectory.go
Expand Up @@ -8,9 +8,6 @@ import (

"golang.org/x/sync/errgroup"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"

indexTypes "github.com/ipfs-search/ipfs-search/components/index/types"
t "github.com/ipfs-search/ipfs-search/types"
)
Expand Down Expand Up @@ -116,7 +113,7 @@ func (c *Crawler) processDirEntries(ctx context.Context, entries <-chan *t.Annot

// Only add to properties up to limit (preventing oversized directory entries) - but queue entries nonetheless.
if dirCnt == c.config.MaxDirSize {
span.AddEvent(ctx, "large-directory")
span.AddEvent("large-directory")
log.Printf("Directory %v is large, crawling entries but not directory itself.", entry.Parent)
isLarge = true
}
Expand Down Expand Up @@ -151,7 +148,7 @@ func (c *Crawler) processDirEntries(ctx context.Context, entries <-chan *t.Annot
}

if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
}

return err
Expand Down
19 changes: 9 additions & 10 deletions components/crawler/crawler.go
Expand Up @@ -6,9 +6,8 @@ import (
"errors"
"log"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs-search/ipfs-search/components/extractor"
"github.com/ipfs-search/ipfs-search/components/protocol"
Expand Down Expand Up @@ -40,7 +39,7 @@ func isSupportedType(rType t.ResourceType) bool {
// Crawl updates existing or crawls new resources, extracting metadata where applicable.
func (c *Crawler) Crawl(ctx context.Context, r *t.AnnotatedResource) error {
ctx, span := c.Tracer.Start(ctx, "crawler.Crawl",
trace.WithAttributes(label.String("cid", r.ID)),
trace.WithAttributes(attribute.String("cid", r.ID)),
)
defer span.End()

Expand All @@ -59,36 +58,36 @@ func (c *Crawler) Crawl(ctx context.Context, r *t.AnnotatedResource) error {

exists, err := c.updateMaybeExisting(ctx, r)
if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
return err
}

if exists {
log.Printf("Not updating existing resource %v", r)
span.AddEvent(ctx, "Not updating existing resource")
span.AddEvent("Not updating existing resource")
return nil
}

if err := c.ensureType(ctx, r); err != nil {
if errors.Is(err, t.ErrInvalidResource) {
// Resource is invalid, index as such, throwing away ErrInvalidResource in favor of the result of indexing operation.
log.Printf("Indexing invalid resource %v", r)
span.AddEvent(ctx, "Indexing invalid resource")
span.AddEvent("Indexing invalid resource")

err = c.indexInvalid(ctx, r, err)
}

// Errors from ensureType imply that no type could be found, hence we can't index.
if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
}
return err
}

log.Printf("Indexing new item %v", r)
err = c.index(ctx, r)
if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
}
return err
}
Expand Down Expand Up @@ -117,7 +116,7 @@ func (c *Crawler) ensureType(ctx context.Context, r *t.AnnotatedResource) error

err = c.protocol.Stat(ctx, r)
if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
}
}

Expand Down
12 changes: 6 additions & 6 deletions components/crawler/index.go
Expand Up @@ -7,8 +7,8 @@ import (
"log"
"time"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs-search/ipfs-search/components/extractor"
"github.com/ipfs-search/ipfs-search/components/index"
Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *Crawler) getFileProperties(ctx context.Context, r *t.AnnotatedResource)
err = e.Extract(ctx, r, properties)
if errors.Is(err, extractor.ErrFileTooLarge) {
// Interpret files which are too large as invalid resources; prevent repeated attempts.
span.RecordError(ctx, err)
span.RecordError(err)
return nil, fmt.Errorf("%w: %v", t.ErrInvalidResource, err)
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (c *Crawler) getProperties(ctx context.Context, r *t.AnnotatedResource) (in
case t.UnsupportedType:
// Index unsupported items as invalid.
err = t.ErrUnsupportedType
span.RecordError(ctx, err)
span.RecordError(err)

return nil, nil, err

Expand All @@ -117,7 +117,7 @@ func (c *Crawler) getProperties(ctx context.Context, r *t.AnnotatedResource) (in

func (c *Crawler) index(ctx context.Context, r *t.AnnotatedResource) error {
ctx, span := c.Tracer.Start(ctx, "crawler.index",
trace.WithAttributes(label.Stringer("type", r.Type)),
trace.WithAttributes(attribute.Stringer("type", r.Type)),
)
defer span.End()

Expand All @@ -126,7 +126,7 @@ func (c *Crawler) index(ctx context.Context, r *t.AnnotatedResource) error {
if err != nil {
if errors.Is(err, t.ErrInvalidResource) {
log.Printf("Indexing invalid '%v', err: %v", r, err)
span.RecordError(ctx, err)
span.RecordError(err)
return c.indexInvalid(ctx, r, err)
}

Expand Down
26 changes: 14 additions & 12 deletions components/crawler/update.go
Expand Up @@ -6,7 +6,8 @@ import (
"log"
"time"

"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

index_types "github.com/ipfs-search/ipfs-search/components/index/types"
t "github.com/ipfs-search/ipfs-search/types"
Expand Down Expand Up @@ -42,10 +43,11 @@ func (c *Crawler) updateExisting(ctx context.Context, i *existingItem) error {
refs, refsUpdated := appendReference(i.References, &i.AnnotatedResource.Reference)

if refsUpdated {
span.AddEvent(ctx, "Updating",
label.String("reason", "reference-added"),
label.Any("new-reference", i.AnnotatedResource.Reference),
)
span.AddEvent("Updating",
trace.WithAttributes(
attribute.String("reason", "reference-added"),
attribute.Stringer("new-reference", &i.AnnotatedResource.Reference),
))

return i.Index.Update(ctx, i.AnnotatedResource.ID, &index_types.Update{
References: refs,
Expand All @@ -70,11 +72,11 @@ func (c *Crawler) updateExisting(ctx context.Context, i *existingItem) error {
}

if isRecent {
span.AddEvent(ctx, "Updating",
label.String("reason", "is-recent"),
// TODO: This causes a panic when LastSeen is nil.
// label.Stringer("last-seen", i.LastSeen),
)
span.AddEvent("Updating",
trace.WithAttributes(
attribute.String("reason", "is-recent")))
// TODO: This causes a panic when LastSeen is nil.
// attribute.Stringer("last-seen", i.LastSeen),

return i.Index.Update(ctx, i.AnnotatedResource.ID, &index_types.Update{
LastSeen: &now,
Expand All @@ -89,7 +91,7 @@ func (c *Crawler) updateExisting(ctx context.Context, i *existingItem) error {
panic(fmt.Sprintf("Unexpected source %s for item %+v", i.Source, i))
}

span.AddEvent(ctx, "Not updating")
span.AddEvent("Not updating")

return nil
}
Expand Down Expand Up @@ -147,7 +149,7 @@ func (c *Crawler) updateMaybeExisting(ctx context.Context, r *t.AnnotatedResourc
// Process existing item
if existing != nil {
if span.IsRecording() {
span.AddEvent(ctx, "existing", label.Any("index", existing.Index))
span.AddEvent("existing") //, trace.WithAttributes(attribute.Stringer("index", existing.Index)))
}

return c.processExisting(ctx, existing)
Expand Down
15 changes: 7 additions & 8 deletions components/crawler/worker/pool.go
Expand Up @@ -11,8 +11,7 @@ import (

samqp "github.com/rabbitmq/amqp091-go"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs-search/ipfs-search/components/crawler"
"github.com/ipfs-search/ipfs-search/components/extractor"
Expand Down Expand Up @@ -183,13 +182,13 @@ func (w *Pool) crawlDelivery(ctx context.Context, d samqp.Delivery) error {
}

if err := json.Unmarshal(d.Body, r); err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
return err
}

if !r.IsValid() {
err := fmt.Errorf("Invalid resource: %v", r)
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
return err
}

Expand All @@ -198,7 +197,7 @@ func (w *Pool) crawlDelivery(ctx context.Context, d samqp.Delivery) error {
log.Printf("Done crawling '%s', result: %v", r, err)

if err != nil {
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
}

return err
Expand All @@ -221,14 +220,14 @@ func (w *Pool) startWorker(ctx context.Context, deliveries <-chan samqp.Delivery
// By default, do not retry.
shouldRetry := false

span.RecordError(ctx, err)
span.RecordError(err)

if err := d.Reject(shouldRetry); err != nil {
span.RecordError(ctx, err)
span.RecordError(err)
}
} else {
if err := d.Ack(false); err != nil {
span.RecordError(ctx, err)
span.RecordError(err)
}
}
}
Expand Down
21 changes: 5 additions & 16 deletions components/extractor/nsfw/extractor.go
Expand Up @@ -7,11 +7,7 @@ import (
"log"
"regexp"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"

"github.com/ipfs-search/ipfs-search/components/extractor"

indexTypes "github.com/ipfs-search/ipfs-search/components/index/types"
"github.com/ipfs-search/ipfs-search/instr"
t "github.com/ipfs-search/ipfs-search/types"
Expand Down Expand Up @@ -80,13 +76,16 @@ func isCompatible(r *t.AnnotatedResource, f *indexTypes.File) bool {
return matchOne(contentType, compatibleMimes)
}


// Extract metadata from a (potentially) referenced resource, updating
// Metadata or returning an error.
func (e *Extractor) Extract(ctx context.Context, r *t.AnnotatedResource, m interface{}) error {
ctx, span := e.Tracer.Start(ctx, "extractor.nsfw_server.Extract")
defer span.End()

if err := extractor.ValidateMaxSize(ctx, r, e.config.MaxFileSize); err != nil {
return err
}

// Timeout if extraction hasn't fully completed within this time.
ctx, cancel := context.WithTimeout(ctx, e.config.RequestTimeout)
defer cancel()
Expand All @@ -98,16 +97,6 @@ func (e *Extractor) Extract(ctx context.Context, r *t.AnnotatedResource, m inter
return nil
}

if r.Size > uint64(e.config.MaxFileSize) {
err := fmt.Errorf("%w: %d", extractor.ErrFileTooLarge, r.Size)
span.RecordError(
ctx, extractor.ErrFileTooLarge, trace.WithErrorStatus(codes.Error),
// TODO: Enable after otel upgrade.
// label.Int64("file.size", r.Size),
)
return err
}

body, err := e.getter.GetBody(ctx, e.getExtractURL(r), 200)
if err != nil {
return err
Expand All @@ -117,7 +106,7 @@ func (e *Extractor) Extract(ctx context.Context, r *t.AnnotatedResource, m inter
var nsfwData indexTypes.NSFW
if err := json.NewDecoder(body).Decode(&nsfwData); err != nil {
err := fmt.Errorf("%w: decoding error %s", t.ErrUnexpectedResponse, err)
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
return err
}

Expand Down
22 changes: 6 additions & 16 deletions components/extractor/tika/extractor.go
Expand Up @@ -7,12 +7,8 @@ import (
"log"
"net/url"

"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"

"github.com/ipfs-search/ipfs-search/components/extractor"
"github.com/ipfs-search/ipfs-search/components/protocol"

"github.com/ipfs-search/ipfs-search/instr"
t "github.com/ipfs-search/ipfs-search/types"
"github.com/ipfs-search/ipfs-search/utils"
Expand All @@ -21,7 +17,7 @@ import (
// Extractor extracts metadata using the ipfs-tika server.
type Extractor struct {
config *Config
getter utils.HTTPBodyGetter
getter utils.HTTPBodyGetter
protocol protocol.Protocol

*instr.Instrumentation
Expand All @@ -38,20 +34,14 @@ func (e *Extractor) Extract(ctx context.Context, r *t.AnnotatedResource, m inter
ctx, span := e.Tracer.Start(ctx, "extractor.tika.Extract")
defer span.End()

if err := extractor.ValidateMaxSize(ctx, r, e.config.MaxFileSize); err != nil {
return err
}

// Timeout if extraction hasn't fully completed within this time.
ctx, cancel := context.WithTimeout(ctx, e.config.RequestTimeout)
defer cancel()

if r.Size > uint64(e.config.MaxFileSize) {
err := fmt.Errorf("%w: %d", extractor.ErrFileTooLarge, r.Size)
span.RecordError(
ctx, extractor.ErrFileTooLarge, trace.WithErrorStatus(codes.Error),
// TODO: Enable after otel upgrade.
// label.Int64("file.size", r.Size),
)
return err
}

body, err := e.getter.GetBody(ctx, e.getExtractURL(r), 200)
if err != nil {
return err
Expand All @@ -61,7 +51,7 @@ func (e *Extractor) Extract(ctx context.Context, r *t.AnnotatedResource, m inter
// Parse resulting JSON
if err := json.NewDecoder(body).Decode(m); err != nil {
err := fmt.Errorf("%w: %v", t.ErrUnexpectedResponse, err)
span.RecordError(ctx, err, trace.WithErrorStatus(codes.Error))
span.RecordError(err)
return err
}

Expand Down

0 comments on commit 0cf9f86

Please sign in to comment.