Skip to content

Commit

Permalink
Merge ec5eeff into e86551d
Browse files Browse the repository at this point in the history
  • Loading branch information
isaachier authored Dec 5, 2017
2 parents e86551d + ec5eeff commit 2183a2f
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestTBufferedReadTransport(t *testing.T) {

secondRead := make([]byte, 7)
n, err = trans.Read(secondRead)
require.NoError(t, err)
require.Equal(t, 6, n)
require.Equal(t, []byte("String"), secondRead[0:6])
require.Equal(t, uint64(0), trans.RemainingBytes())
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/servers/thriftudp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestFlushErrors(t *testing.T) {

trans.Write([]byte{1, 2, 3, 4})
err = trans.Flush()
require.Error(t, trans.Flush(), "Flush with data should fail")
require.Error(t, err, "Flush with data should fail")
})
}

Expand Down
6 changes: 3 additions & 3 deletions crossdock/services/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestGetSamplingRate(t *testing.T) {

// Test with no http server
agent := NewAgentService("", zap.NewNop())
rate, err := agent.GetSamplingRate("svc", "op")
_, err := agent.GetSamplingRate("svc", "op")
assert.Error(t, err)

agent = NewAgentService(server.URL, zap.NewNop())
rate, err = agent.GetSamplingRate("svc", "op")
rate, err := agent.GetSamplingRate("svc", "op")
assert.NoError(t, err)
assert.EqualValues(t, 1, rate)

rate, err = agent.GetSamplingRate("bad_svc", "op")
_, err = agent.GetSamplingRate("bad_svc", "op")
assert.Error(t, err)
}
2 changes: 1 addition & 1 deletion crossdock/services/mocks/T.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package mocks

import (
"github.com/stretchr/testify/mock"
"github.com/crossdock/crossdock-go"
"github.com/stretchr/testify/mock"
)

// T is an autogenerated mock type for the T type
Expand Down
6 changes: 3 additions & 3 deletions crossdock/services/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func TestGetTraces(t *testing.T) {

// Test with no http server
query := NewQueryService("", zap.NewNop())
traces, err := query.GetTraces("svc", "op", map[string]string{"key": "value"})
_, err := query.GetTraces("svc", "op", map[string]string{"key": "value"})
assert.Error(t, err)

query = NewQueryService(server.URL, zap.NewNop())
traces, err = query.GetTraces("svc", "op", map[string]string{"key": "value"})
traces, err := query.GetTraces("svc", "op", map[string]string{"key": "value"})
assert.NoError(t, err)
assert.Len(t, traces, 1)
assert.EqualValues(t, "traceid", traces[0].TraceID)

traces, err = query.GetTraces("bad_svc", "op", map[string]string{"key": "value"})
_, err = query.GetTraces("bad_svc", "op", map[string]string{"key": "value"})
assert.Error(t, err)
}
2 changes: 1 addition & 1 deletion examples/hotrod/services/driver/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r *Redis) FindDriverIDs(ctx context.Context, location string) []string {
span.SetTag("param.location", location)
ext.SpanKindRPCClient.Set(span)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
_ = opentracing.ContextWithSpan(ctx, span)
}
// simulate RPC delay
delay.Sleep(config.RedisFindDelay, config.RedisFindDelayStdDev)
Expand Down
19 changes: 18 additions & 1 deletion model/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (kv *KeyValue) IsLess(two *KeyValue) bool {
}
}

// Matches checks if this KeyValue matches the query key/value pair
func (kv *KeyValue) Matches(key, value string) bool {
return kv.Key == key && kv.AsString() == value
}

func (kvs KeyValues) Len() int { return len(kvs) }
func (kvs KeyValues) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] }
func (kvs KeyValues) Less(i, j int) bool {
Expand All @@ -258,7 +263,7 @@ func (kvs KeyValues) FindByKey(key string) (KeyValue, bool) {
return KeyValue{}, false
}

// Equal compares KyValues with another list. Both lists must be already sorted.
// Equal compares KeyValues with another list. Both lists must be already sorted.
func (kvs KeyValues) Equal(other KeyValues) bool {
l1, l2 := len(kvs), len(other)
if l1 != l2 {
Expand All @@ -272,6 +277,18 @@ func (kvs KeyValues) Equal(other KeyValues) bool {
return true
}

// FindMatch checks for a matching KeyValue in the KeyValues for the given
// query key/value pair. Returns the match and true when the match is found.
// Returns empty KeyValue and false when no match is found.
func (kvs KeyValues) FindMatch(key, value string) (KeyValue, bool) {
for _, kv := range kvs {
if kv.Matches(key, value) {
return kv, true
}
}
return KeyValue{}, false
}

// Hash implements Hash from Hashable.
func (kvs KeyValues) Hash(w io.Writer) error {
for i := range kvs {
Expand Down
10 changes: 10 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func (s *Span) NormalizeTimestamps() {
}
}

// FlattenTags combines span tags, process tags, and log fields into one KeyValues collection
func (s *Span) FlattenTags() KeyValues {
retMe := s.Tags
retMe = append(retMe, s.Process.Tags...)
for _, l := range s.Logs {
retMe = append(retMe, l.Fields...)
}
return retMe
}

// ------- Flags -------

// SetSampled sets the Flags as sampled
Expand Down
9 changes: 6 additions & 3 deletions pkg/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,34 @@ func TestCompareAndSwap(t *testing.T) {
assert.Equal(t, 1, cache.Size())

item, ok = cache.CompareAndSwap("B", nil, "Bar")
assert.True(t, ok)
assert.Equal(t, 2, cache.Size())
assert.Equal(t, "Bar", item)
assert.Equal(t, "Bar", cache.Get("B"))

item, ok = cache.CompareAndSwap("A", "Foo", "Foo2")
assert.Equal(t, true, ok)
assert.True(t, ok)
assert.Equal(t, "Foo2", item)
assert.Equal(t, "Foo2", cache.Get("A"))

item, ok = cache.CompareAndSwap("A", nil, "Foo3")
assert.Equal(t, false, ok)
assert.False(t, ok)
assert.Equal(t, "Foo2", item)
assert.Equal(t, "Foo2", cache.Get("A"))

item, ok = cache.CompareAndSwap("A", "Foo", "Foo3")
assert.False(t, ok)
assert.Equal(t, "Foo2", item)
assert.Equal(t, "Foo2", cache.Get("A"))

item, ok = cache.CompareAndSwap("F", "foo", "Foo3")
assert.Equal(t, false, ok)
assert.False(t, ok)
assert.Nil(t, item)
assert.Nil(t, cache.Get("F"))

// Evict the oldest entry
item, ok = cache.CompareAndSwap("E", nil, "Epsi")
assert.True(t, ok)
assert.Equal(t, "Epsi", item)
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Nil(t, cache.Get("B")) // Oldest, should be evicted
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributedlock/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ type Lock interface {
// acquired is meaningless.
Acquire(resource string, ttl time.Duration) (acquired bool, err error)

// Forfeit forfeits a lease around a given resource. In case of an error,
// forfeited is meaningless.
Forfeit(resource string) (forfeited bool, err error)
}
5 changes: 5 additions & 0 deletions pkg/distributedlock/mocks/Lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"github.com/stretchr/testify/mock"
)

// Lock mocks distributed lock for control of a resource.
type Lock struct {
mock.Mock
}

// Acquire acquires a lease of duration ttl around a given resource. In case of an error,
// acquired is meaningless.
func (_m *Lock) Acquire(resource string, ttl time.Duration) (bool, error) {
ret := _m.Called(resource, ttl)

Expand All @@ -44,6 +47,8 @@ func (_m *Lock) Acquire(resource string, ttl time.Duration) (bool, error) {
return r0, r1
}

// Forfeit forfeits a lease around a given resource. In case of an error,
// forfeited is meaningless.
func (_m *Lock) Forfeit(resource string) (bool, error) {
ret := _m.Called(resource)

Expand Down
2 changes: 1 addition & 1 deletion pkg/es/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ type ESMultiSearchService struct {
multiSearchService *elastic.MultiSearchService
}

// WrapESSearchService creates an ESSearchService out of *elastic.ESSearchService.
// WrapESMultiSearchService creates an ESSearchService out of *elastic.ESSearchService.
func WrapESMultiSearchService(multiSearchService *elastic.MultiSearchService) ESMultiSearchService {
return ESMultiSearchService{multiSearchService: multiSearchService}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/healthcheck/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestPortBusy(t *testing.T) {
func TestHttpCall(t *testing.T) {
logger, _ := zap.NewDevelopment()
state, err := healthcheck.NewState(http.StatusServiceUnavailable, logger)
assert.NoError(t, err)
handler, err := healthcheck.NewHandler(state)
if err != nil {
t.Error("Could not start the health check server.", zap.Error(err))
Expand All @@ -100,15 +101,18 @@ func TestHttpCall(t *testing.T) {
func TestListenerClose(t *testing.T) {
logger, _ := zap.NewDevelopment()
state, err := healthcheck.NewState(http.StatusServiceUnavailable, logger)
assert.NoError(t, err)
handler, err := healthcheck.NewHandler(state)
if err != nil {
t.Error("Could not start the health check server.", zap.Error(err))
}

s := &http.Server{Handler: handler}
l, err := net.Listen("tcp", ":0")
assert.NoError(t, err)
defer l.Close()
s, err = healthcheck.ServeWithListener(l, s, logger)
_, err = healthcheck.ServeWithListener(l, s, logger)
assert.NoError(t, err)
}

func TestServeHandler(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions storage/samplingstore/mocks/Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import "github.com/stretchr/testify/mock"

import "time"

// Store writes and retrieves sampling data to and from storage.
type Store struct {
mock.Mock
}

// InsertThroughput inserts aggregated throughput for operations into storage.
func (_m *Store) InsertThroughput(throughput []*model.Throughput) error {
ret := _m.Called(throughput)

Expand All @@ -35,6 +37,8 @@ func (_m *Store) InsertThroughput(throughput []*model.Throughput) error {

return r0
}

// InsertProbabilitiesAndQPS inserts calculated sampling probabilities and measured qps into storage.
func (_m *Store) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS) error {
ret := _m.Called(hostname, probabilities, qps)

Expand All @@ -47,6 +51,8 @@ func (_m *Store) InsertProbabilitiesAndQPS(hostname string, probabilities model.

return r0
}

// GetThroughput retrieves aggregated throughput for operations within a time range.
func (_m *Store) GetThroughput(start time.Time, end time.Time) ([]*model.Throughput, error) {
ret := _m.Called(start, end)

Expand All @@ -68,6 +74,8 @@ func (_m *Store) GetThroughput(start time.Time, end time.Time) ([]*model.Through

return r0, r1
}

// GetProbabilitiesAndQPS retrieves the sampling probabilities and measured qps per host within a time range.
func (_m *Store) GetProbabilitiesAndQPS(start time.Time, end time.Time) (map[string][]model.ServiceOperationData, error) {
ret := _m.Called(start, end)

Expand All @@ -89,6 +97,8 @@ func (_m *Store) GetProbabilitiesAndQPS(start time.Time, end time.Time) (map[str

return r0, r1
}

// GetLatestProbabilities retrieves the latest sampling probabilities.
func (_m *Store) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
ret := _m.Called()

Expand Down
22 changes: 2 additions & 20 deletions storage/spanstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,29 +199,11 @@ func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameter
if !query.StartTimeMax.IsZero() && span.StartTime.After(query.StartTimeMax) {
return false
}
spanKVs := m.flattenTags(span)
spanKVs := span.FlattenTags()
for queryK, queryV := range query.Tags {
keyValueFoundAndMatches := false
// (NB): we cannot find the KeyValue.Find function because there can be multiple tags with the same key
for _, keyValue := range spanKVs {
if keyValue.Key == queryK && keyValue.AsString() == queryV {
keyValueFoundAndMatches = true
break
}
}
if !keyValueFoundAndMatches {
if _, ok := spanKVs.FindMatch(queryK, queryV); !ok {
return false
}
}
return true
}

// TODO: this is a good candidate function to have on a span
func (m *Store) flattenTags(span *model.Span) model.KeyValues {
retMe := span.Tags
retMe = append(retMe, span.Process.Tags...)
for _, l := range span.Logs {
retMe = append(retMe, l.Fields...)
}
return retMe
}

0 comments on commit 2183a2f

Please sign in to comment.