Skip to content

Commit

Permalink
Restore the experimental/streaming SDK implementation (#55)
Browse files Browse the repository at this point in the history
* Fix streaming - part 1

* Eliminate span{} state

* Eliminate trace/ dir

* Avoid missing AddEvent helpers
  • Loading branch information
jmacd authored and rghetia committed Jul 17, 2019
1 parent 2c77e48 commit b26d667
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 187 deletions.
5 changes: 2 additions & 3 deletions example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/api/stats"
"go.opentelemetry.io/api/tag"
"go.opentelemetry.io/api/trace"
"go.opentelemetry.io/experimental/streaming/sdk/event"
)

var (
Expand Down Expand Up @@ -64,7 +63,7 @@ func main() {

err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error {

trace.CurrentSpan(ctx).AddEvent(ctx, event.WithAttr("Nice operation!", key.New("bogons").Int(100)))
trace.CurrentSpan(ctx).Event(ctx, "Nice operation!", key.New("bogons").Int(100))

trace.CurrentSpan(ctx).SetAttributes(anotherKey.String("yes"))

Expand All @@ -76,7 +75,7 @@ func main() {
func(ctx context.Context) error {
trace.CurrentSpan(ctx).SetAttribute(lemonsKey.String("five"))

trace.CurrentSpan(ctx).AddEvent(ctx, event.WithString("Format schmormat %d!", 100))
trace.CurrentSpan(ctx).Event(ctx, "Sub span event")

stats.Record(ctx, measureTwo.M(1.3))

Expand Down
13 changes: 10 additions & 3 deletions experimental/streaming/exporter/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,14 @@ func Foreach(f func(Observer)) {
}
}

func NewScope(parent ScopeID, kv ...core.KeyValue) ScopeID {
// TODO
return parent
func NewScope(parent ScopeID, attributes ...core.KeyValue) ScopeID {
eventID := Record(Event{
Type: NEW_SCOPE,
Scope: parent,
Attributes: attributes,
})
return ScopeID{
EventID: eventID,
SpanContext: parent.SpanContext,
}
}
8 changes: 5 additions & 3 deletions experimental/streaming/exporter/reader/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"go.opentelemetry.io/api/core"
"go.opentelemetry.io/api/key"
"go.opentelemetry.io/experimental/streaming/exporter/reader"
"go.opentelemetry.io/experimental/streaming/sdk/trace"

// TODO this should not be an SDK dependency; move conventional tags into the API.
"go.opentelemetry.io/experimental/streaming/sdk"
)

var (
Expand Down Expand Up @@ -119,10 +121,10 @@ func AppendEvent(buf *strings.Builder, data reader.Event) {
data.Tags.Foreach(f(true))
}
if data.SpanContext.HasSpanID() {
f(false)(trace.SpanIDKey.String(data.SpanContext.SpanIDString()))
f(false)(sdk.SpanIDKey.String(data.SpanContext.SpanIDString()))
}
if data.SpanContext.HasTraceID() {
f(false)(trace.TraceIDKey.String(data.SpanContext.TraceIDString()))
f(false)(sdk.TraceIDKey.String(data.SpanContext.TraceIDString()))
}

buf.WriteString(" ]\n")
Expand Down
7 changes: 6 additions & 1 deletion experimental/streaming/exporter/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func NewReaderObserver(readers ...Reader) observer.Observer {
}

func (ro *readerObserver) Observe(event observer.Event) {
// TODO this should check for out-of-order events and buffer.
ro.orderedObserve(event)
}

func (ro *readerObserver) orderedObserve(event observer.Event) {
read := Event{
Time: event.Time,
Sequence: event.Sequence,
Expand Down Expand Up @@ -169,7 +174,7 @@ func (ro *readerObserver) Observe(event observer.Event) {
case observer.FINISH_SPAN:
attrs, span := ro.readScope(event.Scope)
if span == nil {
panic("span not found")
panic(fmt.Sprint("span not found", event.Scope))
}

read.Name = span.name
Expand Down
3 changes: 1 addition & 2 deletions experimental/streaming/exporter/spanlog/spanlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"os"
"strings"

"go.opentelemetry.io/experimental/streaming/exporter/buffer"
"go.opentelemetry.io/experimental/streaming/exporter/observer"
"go.opentelemetry.io/experimental/streaming/exporter/spandata"
"go.opentelemetry.io/experimental/streaming/exporter/spandata/format"
Expand All @@ -27,7 +26,7 @@ import (
type spanLog struct{}

func New() observer.Observer {
return buffer.NewBuffer(1000, spandata.NewReaderObserver(&spanLog{}))
return spandata.NewReaderObserver(&spanLog{})
}

func (s *spanLog) Read(data *spandata.Span) {
Expand Down
48 changes: 0 additions & 48 deletions experimental/streaming/sdk/event/event.go

This file was deleted.

9 changes: 9 additions & 0 deletions experimental/streaming/sdk/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sdk

import (
"go.opentelemetry.io/api/trace"
)

func init() {
trace.SetGlobalTracer(New())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package trace
package sdk

import (
"context"
"sync"

"google.golang.org/grpc/codes"

Expand All @@ -28,22 +27,14 @@ import (
)

type span struct {
tracer *tracer
spanContext core.SpanContext
lock sync.Mutex
eventID observer.EventID
finishOnce sync.Once
recordEvent bool
status codes.Code
tracer *tracer
initial observer.ScopeID
}

// SpancContext returns span context of the span. Return SpanContext is usable
// even after the span is finished.
func (sp *span) SpanContext() core.SpanContext {
if sp == nil {
return core.INVALID_SPAN_CONTEXT
}
return sp.spanContext
return sp.initial.SpanContext
}

// IsRecordingEvents returns true is the span is active and recording events is enabled.
Expand All @@ -53,122 +44,55 @@ func (sp *span) IsRecordingEvents() bool {

// SetStatus sets the status of the span.
func (sp *span) SetStatus(status codes.Code) {
if sp == nil {
return
}
sid := sp.ScopeID()

observer.Record(observer.Event{
Type: observer.SET_STATUS,
Scope: sid,
Sequence: sid.EventID,
Status: status,
Type: observer.SET_STATUS,
Scope: sp.ScopeID(),
Status: status,
})
sp.status = status
}

func (sp *span) ScopeID() observer.ScopeID {
if sp == nil {
return observer.ScopeID{}
}
sp.lock.Lock()
sid := observer.ScopeID{
EventID: sp.eventID,
SpanContext: sp.spanContext,
}
sp.lock.Unlock()
return sid
}

func (sp *span) updateScope() (observer.ScopeID, observer.EventID) {
next := observer.NextEventID()

sp.lock.Lock()
sid := observer.ScopeID{
EventID: sp.eventID,
SpanContext: sp.spanContext,
}
sp.eventID = next
sp.lock.Unlock()

return sid, next
}

func (sp *span) SetError(v bool) {
sp.SetAttribute(ErrorKey.Bool(v))
return sp.initial
}

func (sp *span) SetAttribute(attribute core.KeyValue) {
if sp == nil {
return
}

sid, next := sp.updateScope()

observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Attribute: attribute,
})
}

func (sp *span) SetAttributes(attributes ...core.KeyValue) {
if sp == nil {
return
}

sid, next := sp.updateScope()

observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Attributes: attributes,
})
}

func (sp *span) ModifyAttribute(mutator tag.Mutator) {
if sp == nil {
return
}

sid, next := sp.updateScope()

observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Mutator: mutator,
Type: observer.MODIFY_ATTR,
Scope: sp.ScopeID(),
Mutator: mutator,
})
}

func (sp *span) ModifyAttributes(mutators ...tag.Mutator) {
if sp == nil {
return
}

sid, next := sp.updateScope()

observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Mutators: mutators,
})
}

func (sp *span) Finish() {
if sp == nil {
return
}
recovered := recover()
sp.finishOnce.Do(func() {
observer.Record(observer.Event{
Type: observer.FINISH_SPAN,
Scope: sp.ScopeID(),
Recovered: recovered,
})
observer.Record(observer.Event{
Type: observer.FINISH_SPAN,
Scope: sp.ScopeID(),
Recovered: recovered,
})
if recovered != nil {
panic(recovered)
Expand Down
Loading

0 comments on commit b26d667

Please sign in to comment.