Skip to content

blob: migration of blob to use Opentelemetry #3548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 68 additions & 88 deletions blob/blob.go
Original file line number Diff line number Diff line change
@@ -31,12 +31,12 @@
// The Bucket.ErrorAs method can retrieve the driver error underlying the returned
// error.
//
// # OpenCensus Integration
// # OpenTelemetry Integration
//
// OpenCensus supports tracing and metric collection for multiple languages and
// backend providers. See https://opencensus.io.
// OpenTelemetry supports tracing, metrics, and logs collection for multiple languages and
// backend providers. See https://opentelemetry.io.
//
// This API collects OpenCensus traces and metrics for the following methods:
// This API collects OpenTelemetry traces and metrics for the following methods:
// - Attributes
// - Copy
// - Delete
@@ -57,10 +57,10 @@
// - gocloud.dev/blob/bytes_read: the total number of bytes read, by driver.
// - gocloud.dev/blob/bytes_written: the total number of bytes written, by driver.
//
// To enable trace collection in your application, see "Configure Exporter" at
// https://opencensus.io/quickstart/go/tracing.
// To enable metric collection in your application, see "Exporting stats" at
// https://opencensus.io/quickstart/go/metrics.
// To enable trace collection in your application, see the documentation at
// https://opentelemetry.io/docs/instrumentation/go/getting-started/.
// To enable metric collection in your application, see the documentation at
// https://opentelemetry.io/docs/instrumentation/go/manual/.
package blob // import "gocloud.dev/blob"

import (
@@ -80,14 +80,12 @@ import (
"time"
"unicode/utf8"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/metric"
"gocloud.dev/blob/driver"
"gocloud.dev/gcerrors"
"gocloud.dev/internal/gcerr"
"gocloud.dev/internal/oc"
"gocloud.dev/internal/openurl"
gcdkotel "gocloud.dev/internal/otel"
)

// Ensure that Reader implements io.ReadSeekCloser.
@@ -108,7 +106,7 @@ type Reader struct {
savedOffset int64 // Last relativeOffset for r, saved after relativeOffset is changed in Seek, or -1 if no Seek.
end func(error) // Called at Close to finish trace and metric collection.
// for metric collection;
statsTagMutators []tag.Mutator
bytesReadCounter metric.Int64Counter
bytesRead int
closed bool
}
@@ -200,10 +198,12 @@ func (r *Reader) Close() error {
err := wrapError(r.b, r.r.Close(), r.key)
r.end(err)
// Emit only on close to avoid an allocation on each call to Read().
stats.RecordWithTags(
context.Background(),
r.statsTagMutators,
bytesReadMeasure.M(int64(r.bytesRead)))
// Record bytes read metric with OpenTelemetry
if r.bytesReadCounter != nil && r.bytesRead > 0 {
r.bytesReadCounter.Add(
r.ctx,
int64(r.bytesRead))
}
return err
}

@@ -358,16 +358,18 @@ func (a *Attributes) As(i any) bool {
// It implements io.WriteCloser (https://golang.org/pkg/io/#Closer), and must be
// closed after all writes are done.
type Writer struct {
b driver.Bucket
w driver.Writer
key string
end func(error) // called at Close to finish trace and metric collection
cancel func() // cancels the ctx provided to NewTypedWriter if contentMD5 verification fails
contentMD5 []byte
md5hash hash.Hash
statsTagMutators []tag.Mutator // for metric collection
bytesWritten int
closed bool
b driver.Bucket
w driver.Writer
key string
end func(err error) // called at Close to finish trace and metric collection
cancel func() // cancels the ctx provided to NewTypedWriter if contentMD5 verification fails
contentMD5 []byte
md5hash hash.Hash

// Metric collection fields
bytesWrittenCounter metric.Int64Counter
bytesWritten int
closed bool

// These fields are non-zero values only when w is nil (not yet created).
//
@@ -434,10 +436,12 @@ func (w *Writer) Close() (err error) {
defer func() {
w.end(err)
// Emit only on close to avoid an allocation on each call to Write().
stats.RecordWithTags(
context.Background(),
w.statsTagMutators,
bytesWrittenMeasure.M(int64(w.bytesWritten)))
// Record bytes written metric with OpenTelemetry
if w.bytesWrittenCounter != nil && w.bytesWritten > 0 {
w.bytesWrittenCounter.Add(
w.ctx,
int64(w.bytesWritten))
}
}()
if len(w.contentMD5) > 0 {
// Verify the MD5 hash of what was written matches the ContentMD5 provided
@@ -642,7 +646,10 @@ func (o *ListObject) As(i any) bool {
// To create a Bucket, use constructors found in driver subpackages.
type Bucket struct {
b driver.Bucket
tracer *oc.Tracer
tracer *gcdkotel.Tracer

bytesReadCounter metric.Int64Counter
bytesWrittenCounter metric.Int64Counter

// ioFSCallback is set via SetIOFSCallback, which must be
// called before calling various functions implementing interfaces
@@ -658,48 +665,21 @@ type Bucket struct {

const pkgName = "gocloud.dev/blob"

var (
latencyMeasure = oc.LatencyMeasure(pkgName)
bytesReadMeasure = stats.Int64(pkgName+"/bytes_read", "Total bytes read", stats.UnitBytes)
bytesWrittenMeasure = stats.Int64(pkgName+"/bytes_written", "Total bytes written", stats.UnitBytes)

// OpenCensusViews are predefined views for OpenCensus metrics.
// The views include counts and latency distributions for API method calls,
// and total bytes read and written.
// See the example at https://godoc.org/go.opencensus.io/stats/view for usage.
OpenCensusViews = append(
oc.Views(pkgName, latencyMeasure),
&view.View{
Name: pkgName + "/bytes_read",
Measure: bytesReadMeasure,
Description: "Sum of bytes read from the service.",
TagKeys: []tag.Key{oc.ProviderKey},
Aggregation: view.Sum(),
},
&view.View{
Name: pkgName + "/bytes_written",
Measure: bytesWrittenMeasure,
Description: "Sum of bytes written to the service.",
TagKeys: []tag.Key{oc.ProviderKey},
Aggregation: view.Sum(),
})
)

// NewBucket is intended for use by drivers only. Do not use in application code.
var NewBucket = newBucket

// newBucket creates a new *Bucket based on a specific driver implementation.
// End users should use subpackages to construct a *Bucket instead of this
// function; see the package documentation for details.
func newBucket(b driver.Bucket) *Bucket {
providerName := gcdkotel.ProviderName(b)

return &Bucket{
b: b,
ioFSCallback: func() (context.Context, *ReaderOptions) { return context.Background(), nil },
tracer: &oc.Tracer{
Package: pkgName,
Provider: oc.ProviderName(b),
LatencyMeasure: latencyMeasure,
},
b: b,
ioFSCallback: func() (context.Context, *ReaderOptions) { return context.Background(), nil },
tracer: gcdkotel.NewTracer(pkgName, providerName),
bytesReadCounter: gcdkotel.BytesMeasure(pkgName, providerName, "/bytes_read", "Total bytes read from blob storage"),
bytesWrittenCounter: gcdkotel.BytesMeasure(pkgName, providerName, "/bytes_written", "Total bytes written to blob storage"),
}
}

@@ -827,8 +807,8 @@ func (b *Bucket) ListPage(ctx context.Context, pageToken []byte, pageSize int, o
return nil, nil, errClosed
}

ctx = b.tracer.Start(ctx, "ListPage")
defer func() { b.tracer.End(ctx, err) }()
ctx, span := b.tracer.Start(ctx, "ListPage")
defer func() { b.tracer.End(ctx, span, err) }()

dopts := &driver.ListOptions{
Prefix: opts.Prefix,
@@ -911,8 +891,8 @@ func (b *Bucket) Attributes(ctx context.Context, key string) (_ *Attributes, err
if b.closed {
return nil, errClosed
}
ctx = b.tracer.Start(ctx, "Attributes")
defer func() { b.tracer.End(ctx, err) }()
ctx, span := b.tracer.Start(ctx, "Attributes")
defer func() { b.tracer.End(ctx, span, err) }()

a, err := b.b.Attributes(ctx, key)
if err != nil {
@@ -987,20 +967,20 @@ func (b *Bucket) newRangeReader(ctx context.Context, key string, offset, length
dopts := &driver.ReaderOptions{
BeforeRead: opts.BeforeRead,
}
tctx := b.tracer.Start(ctx, "NewRangeReader")
ctx, span := b.tracer.Start(ctx, "NewRangeReader")
defer func() {
// If err == nil, we handed the end closure off to the returned *Reader; it
// will be called when the Reader is Closed.
if err != nil {
b.tracer.End(tctx, err)
b.tracer.End(ctx, span, err)
}
}()
var dr driver.Reader
dr, err = b.b.NewRangeReader(ctx, key, offset, length, dopts)
if err != nil {
return nil, wrapError(b.b, err, key)
}
end := func(err error) { b.tracer.End(tctx, err) }
end := func(err error) { b.tracer.End(ctx, span, err) }
r := &Reader{
b: b.b,
r: dr,
@@ -1011,7 +991,7 @@ func (b *Bucket) newRangeReader(ctx context.Context, key string, offset, length
baseLength: length,
savedOffset: -1,
end: end,
statsTagMutators: []tag.Mutator{tag.Upsert(oc.ProviderKey, b.tracer.Provider)},
bytesReadCounter: b.bytesReadCounter,
}
_, file, lineno, ok := runtime.Caller(2)
runtime.SetFinalizer(r, func(r *Reader) {
@@ -1129,22 +1109,23 @@ func (b *Bucket) NewWriter(ctx context.Context, key string, opts *WriterOptions)
return nil, errClosed
}
ctx, cancel := context.WithCancel(ctx)
tctx := b.tracer.Start(ctx, "NewWriter")
end := func(err error) { b.tracer.End(tctx, err) }
ctx, span := b.tracer.Start(ctx, "NewWriter")
end := func(err error) { b.tracer.End(ctx, span, err) }
defer func() {
if err != nil {
end(err)
}
}()

w := &Writer{
b: b.b,
end: end,
cancel: cancel,
key: key,
contentMD5: opts.ContentMD5,
md5hash: md5.New(),
statsTagMutators: []tag.Mutator{tag.Upsert(oc.ProviderKey, b.tracer.Provider)},
b: b.b,
end: end,
cancel: cancel,
key: key,
contentMD5: opts.ContentMD5,
md5hash: md5.New(),
bytesWrittenCounter: b.bytesWrittenCounter,
ctx: ctx,
}
if opts.ContentType != "" || opts.DisableContentTypeDetection {
var ct string
@@ -1165,7 +1146,6 @@ func (b *Bucket) NewWriter(ctx context.Context, key string, opts *WriterOptions)
} else {
// Save the fields needed to called NewTypedWriter later, once we've gotten
// sniffLen bytes; see the comment on Writer.
w.ctx = ctx
w.opts = dopts
w.buf = bytes.NewBuffer([]byte{})
}
@@ -1207,8 +1187,8 @@ func (b *Bucket) Copy(ctx context.Context, dstKey, srcKey string, opts *CopyOpti
if b.closed {
return errClosed
}
ctx = b.tracer.Start(ctx, "Copy")
defer func() { b.tracer.End(ctx, err) }()
ctx, span := b.tracer.Start(ctx, "Copy")
defer func() { b.tracer.End(ctx, span, err) }()
return wrapError(b.b, b.b.Copy(ctx, dstKey, srcKey, dopts), fmt.Sprintf("%s -> %s", srcKey, dstKey))
}

@@ -1225,8 +1205,8 @@ func (b *Bucket) Delete(ctx context.Context, key string) (err error) {
if b.closed {
return errClosed
}
ctx = b.tracer.Start(ctx, "Delete")
defer func() { b.tracer.End(ctx, err) }()
ctx, span := b.tracer.Start(ctx, "Delete")
defer func() { b.tracer.End(ctx, span, err) }()
return wrapError(b.b, b.b.Delete(ctx, key), key)
}

Loading
Oops, something went wrong.