Skip to content

Commit

Permalink
revert to atomic.Value
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Aug 11, 2023
1 parent 6b9f2ed commit d204195
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
8 changes: 4 additions & 4 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ManualReader struct {

mu sync.Mutex
isShutdown bool
externalProducers []Producer
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
Expand All @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader {
r := &ManualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
externalProducers: cfg.producers,
}
r.externalProducers.Store(cfg.producers)
return r
}

Expand Down Expand Up @@ -88,7 +88,7 @@ func (mr *ManualReader) Shutdown(context.Context) error {
defer mr.mu.Unlock()
mr.isShutdown = true
// release references to Producer(s)
mr.externalProducers = nil
mr.externalProducers.Store([]Producer{})
err = nil
})
return err
Expand Down Expand Up @@ -126,7 +126,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
return err
}
var errs []error
for _, producer := range mr.externalProducers {
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
Expand Down
20 changes: 10 additions & 10 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
externalProducers: conf.producers,
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store(conf.producers)

go func() {
defer func() { close(r.done) }()
Expand All @@ -147,7 +147,7 @@ type PeriodicReader struct {

mu sync.Mutex
isShutdown bool
externalProducers []Producer
externalProducers atomic.Value

interval time.Duration
timeout time.Duration
Expand Down Expand Up @@ -263,7 +263,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd
return err
}
var errs []error
for _, producer := range r.externalProducers {
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
defer r.mu.Unlock()
r.isShutdown = true
// release references to Producer(s)
r.externalProducers = nil
r.externalProducers.Store([]Producer{})
})
return err
}
Expand Down

0 comments on commit d204195

Please sign in to comment.