diff --git a/agent/agent.go b/agent/agent.go index 3b26e199..02bd52e1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -56,13 +56,15 @@ type ( logger *log.Logger printReportOnce sync.Once + + cache *localCache } Option func(*Agent) ) var ( - version = "0.3.1-pre1" + version = "0.3.1-pre2" testingModeFrequency = time.Second nonTestingModeFrequency = time.Minute @@ -376,6 +378,9 @@ func NewAgent(options ...Option) (*Agent, error) { agent.logMetadata() } + // + agent.cache = newLocalCache(agent.getRemoteConfigRequest(), cacheTimeout, agent.debugMode, agent.logger) + agent.recorder = NewSpanRecorder(agent) var recorder tracer.SpanRecorder = agent.recorder if agent.optionalRecorders != nil { diff --git a/agent/cache.go b/agent/cache.go new file mode 100644 index 00000000..eede8e39 --- /dev/null +++ b/agent/cache.go @@ -0,0 +1,153 @@ +package agent + +import ( + "crypto/sha1" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/mitchellh/go-homedir" +) + +const cacheTimeout = 1 * time.Minute + +type ( + localCache struct { + m sync.Mutex + tenant interface{} + basePath string + timeout time.Duration + debugMode bool + logger *log.Logger + } + cacheItem struct { + Value interface{} + } +) + +// Create a new local cache +func newLocalCache(tenant interface{}, timeout time.Duration, debugMode bool, logger *log.Logger) *localCache { + lc := &localCache{ + timeout: timeout, + debugMode: debugMode, + logger: logger, + } + lc.SetTenant(tenant) + return lc +} + +// Gets or sets a local cache value +func (c *localCache) GetOrSet(key string, useTimeout bool, fn func(interface{}, string) interface{}) interface{} { + c.m.Lock() + defer c.m.Unlock() + + // Loader function + loaderFunc := func(key string, err error, fn func(interface{}, string) interface{}) interface{} { + if err != nil { + c.logger.Printf("Local cache: %v", err) + } + if fn == nil { + return nil + } + + // Call the loader + resp := fn(c.tenant, key) + + if resp != nil { + path := fmt.Sprintf("%s.%s", c.basePath, key) + cItem := &cacheItem{Value: resp} + // Save a local cache for the response + if data, err := json.Marshal(cItem); err == nil { + if c.debugMode { + c.logger.Printf("Local cache saving: %s => %s", path, string(data)) + } + if err := ioutil.WriteFile(path, data, 0755); err != nil { + c.logger.Printf("Error writing json file: %v", err) + } + } + } + + return resp + } + + path := fmt.Sprintf("%s.%s", c.basePath, key) + + // We try to load the cached value + file, err := os.Open(path) + if err != nil { + return loaderFunc(key, err, fn) + } + defer file.Close() + + // Checks if the cache data is old + if useTimeout { + fInfo, err := file.Stat() + if err != nil { + return loaderFunc(key, err, fn) + } + sTime := time.Now().Sub(fInfo.ModTime()) + if sTime > cacheTimeout { + err = errors.New(fmt.Sprintf("The local cache key '%s' has timeout: %v", path, sTime)) + return loaderFunc(key, err, fn) + } + } + + // Read the cached value + fileBytes, err := ioutil.ReadAll(file) + if err != nil { + return loaderFunc(key, err, fn) + } + + // Unmarshal json data + var cItem cacheItem + if err := json.Unmarshal(fileBytes, &cItem); err != nil { + return loaderFunc(key, err, fn) + } else { + c.logger.Printf("Local cache loaded: %s (%d bytes)", path, len(fileBytes)) + return cItem.Value + } +} + +// Sets the local cache tenant +func (c *localCache) SetTenant(tenant interface{}) { + homeDir, err := homedir.Dir() + if err != nil { + c.logger.Printf("local cache error: %v", err) + return + } + data, err := json.Marshal(tenant) + if err != nil { + c.logger.Printf("local cache error: %v", err) + return + } + hash := fmt.Sprintf("%x", sha1.Sum(data)) + + var folder string + if runtime.GOOS == "windows" { + folder = fmt.Sprintf("%s/AppData/Roaming/scope/cache", homeDir) + } else { + folder = fmt.Sprintf("%s/.scope/cache", homeDir) + } + + if _, err := os.Stat(folder); err == nil { + c.tenant = tenant + c.basePath = filepath.Join(folder, hash) + } else if os.IsNotExist(err) { + err = os.MkdirAll(folder, 0755) + if err != nil { + c.logger.Printf("local cache error: %v", err) + return + } + c.tenant = tenant + c.basePath = filepath.Join(folder, hash) + } else { + c.logger.Printf("local cache error: %v", err) + } +} diff --git a/agent/cache_test.go b/agent/cache_test.go new file mode 100644 index 00000000..a58f081f --- /dev/null +++ b/agent/cache_test.go @@ -0,0 +1,51 @@ +package agent + +import ( + "fmt" + "log" + "os" + "testing" + "time" +) + +func getTenant() interface{} { + return map[string]string{ + "key1": "value1", + "key2": fmt.Sprintf("%v", time.Now()), + } +} + +func TestLocalCache(t *testing.T) { + + tenant := getTenant() + + cache := newLocalCache(tenant, cacheTimeout, true, log.New(os.Stdout, "", 0)) + loader := false + result := cache.GetOrSet("MyKey01", false, func(i interface{}, s string) interface{} { + loader = true + return "hello world" + }) + + if !loader { + t.Fatal("loader has not been executed.") + } + if result.(string) != "hello world" { + t.Fatal("result was different than expected.") + } + + cache2 := newLocalCache(tenant, cacheTimeout, true, log.New(os.Stdout, "", 0)) + loader = false + for i := 0; i < 10; i++ { + result = cache2.GetOrSet("MyKey01", false, func(i interface{}, s string) interface{} { + loader = true + return "hello world" + }) + + if loader { + t.Fatal("loader has been executed.") + } + if result.(string) != "hello world" { + t.Fatal("result was different than expected.") + } + } +} diff --git a/agent/ntp.go b/agent/ntp.go index c70d213a..7a695cc3 100644 --- a/agent/ntp.go +++ b/agent/ntp.go @@ -9,7 +9,7 @@ import ( const ( server = "pool.ntp.org" retries = 5 - timeout = 1 * time.Second + timeout = 2 * time.Second backoff = 1 * time.Second ) @@ -38,12 +38,17 @@ func (r *SpanRecorder) applyNTPOffset(t time.Time) time.Time { if r.debugMode { r.logger.Println("calculating ntp offset.") } - offset, err := getNTPOffset() - if err == nil { - ntpOffset = offset + offset := r.cache.GetOrSet("ntp", true, func(d interface{}, s string) interface{} { + oSet, err := getNTPOffset() + if err != nil { + r.logger.Printf("error calculating the ntp offset: %v\n", err) + return nil + } + return float64(oSet) + }) + if offset != nil { + ntpOffset = time.Duration(offset.(float64)) r.logger.Printf("ntp offset: %v\n", ntpOffset) - } else { - r.logger.Printf("error calculating the ntp offset: %v\n", err) } }) return t.Add(ntpOffset) diff --git a/agent/recorder.go b/agent/recorder.go index 103a37ad..5d994aad 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -46,6 +46,7 @@ type ( logger *log.Logger stats *RecorderStats statsOnce sync.Once + cache *localCache } RecorderStats struct { totalSpans int64 @@ -76,6 +77,7 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder { r.debugMode = agent.debugMode r.metadata = agent.metadata r.logger = agent.logger + r.cache = agent.cache r.flushFrequency = agent.flushFrequency r.url = agent.getUrl("api/agent/ingest") r.client = &http.Client{} diff --git a/agent/remote_config.go b/agent/remote_config.go index 18250af2..858ce2e2 100644 --- a/agent/remote_config.go +++ b/agent/remote_config.go @@ -2,7 +2,6 @@ package agent import ( "bytes" - "crypto/sha1" "crypto/x509" "encoding/json" "errors" @@ -10,18 +9,24 @@ import ( "io/ioutil" "net/http" "net/url" - "os" - "path/filepath" - "runtime" "time" - "github.com/mitchellh/go-homedir" - "go.undefinedlabs.com/scopeagent/tags" ) // Loads the remote agent configuration from local cache, if not exists then retrieve it from the server func (a *Agent) loadRemoteConfiguration() map[string]interface{} { + if a == nil { + return nil + } + configResponse := a.cache.GetOrSet("remoteconfig", false, a.getRemoteConfiguration) + if configResponse != nil { + return configResponse.(map[string]interface{}) + } + return nil +} + +func (a *Agent) getRemoteConfigRequest() map[string]interface{} { if a == nil || a.metadata == nil { return nil } @@ -40,13 +45,13 @@ func (a *Agent) loadRemoteConfiguration() map[string]interface{} { } if a.debugMode { jsBytes, _ := json.Marshal(configRequest) - a.logger.Printf("Getting remote configuration for: %v", string(jsBytes)) + a.logger.Printf("Configuration request: %v", string(jsBytes)) } - return a.getOrSetRemoteConfigurationCache(configRequest, a.getRemoteConfiguration) + return configRequest } // Gets the remote agent configuration from the endpoint + api/agent/config -func (a *Agent) getRemoteConfiguration(cfgRequest map[string]interface{}) map[string]interface{} { +func (a *Agent) getRemoteConfiguration(cfgRequest interface{}, key string) interface{} { client := &http.Client{} curl := a.getUrl("api/agent/config") payload, err := msgPackEncodePayload(cfgRequest) @@ -138,93 +143,3 @@ func (a *Agent) getRemoteConfiguration(cfgRequest map[string]interface{}) map[st } return nil } - -// Gets or sets the remote agent configuration local cache -func (a *Agent) getOrSetRemoteConfigurationCache(metadata map[string]interface{}, fn func(map[string]interface{}) map[string]interface{}) map[string]interface{} { - if metadata == nil { - return nil - } - var ( - path string - err error - ) - path, err = getRemoteConfigurationCachePath(metadata) - if err == nil { - // We try to load the cached version of the remote configuration - file, lerr := os.Open(path) - err = lerr - if lerr == nil { - defer file.Close() - fileBytes, lerr := ioutil.ReadAll(file) - err = lerr - if lerr == nil { - var res map[string]interface{} - if lerr = json.Unmarshal(fileBytes, &res); lerr == nil { - if a.debugMode { - a.logger.Printf("Remote configuration cache: %v", string(fileBytes)) - } else { - a.logger.Printf("Remote configuration cache: %v", path) - } - return res - } else { - err = lerr - } - } - } - } - if err != nil { - a.logger.Printf("Remote configuration cache: %v", err) - } - - if fn == nil { - return nil - } - - // Call the loader - resp := fn(metadata) - - if resp != nil && path != "" { - // Save a local cache for the response - if data, err := json.Marshal(&resp); err == nil { - if a.debugMode { - a.logger.Printf("Saving Remote configuration cache: %v", string(data)) - } - if err := ioutil.WriteFile(path, data, 0755); err != nil { - a.logger.Printf("Error writing json file: %v", err) - } - } - } - return resp -} - -// Gets the remote agent configuration local cache path -func getRemoteConfigurationCachePath(metadata map[string]interface{}) (string, error) { - homeDir, err := homedir.Dir() - if err != nil { - return "", err - } - data, err := json.Marshal(metadata) - if err != nil { - return "", err - } - hash := fmt.Sprintf("%x", sha1.Sum(data)) - - var folder string - if runtime.GOOS == "windows" { - folder = fmt.Sprintf("%s/AppData/Roaming/scope/cache", homeDir) - } else { - folder = fmt.Sprintf("%s/.scope/cache", homeDir) - } - - if _, err := os.Stat(folder); err == nil { - return filepath.Join(folder, hash), nil - } else if os.IsNotExist(err) { - err = os.MkdirAll(folder, 0755) - if err != nil { - return "", err - } - return filepath.Join(folder, hash), nil - } else { - return "", err - } -} diff --git a/agent/util.go b/agent/util.go index 2e34838f..223dc853 100644 --- a/agent/util.go +++ b/agent/util.go @@ -35,7 +35,7 @@ func getSourceRootFromEnv(key string) string { } // Encodes `payload` using msgpack and compress it with gzip -func msgPackEncodePayload(payload map[string]interface{}) (*bytes.Buffer, error) { +func msgPackEncodePayload(payload interface{}) (*bytes.Buffer, error) { binaryPayload, err := msgpack.Marshal(payload) if err != nil { return nil, err diff --git a/instrumentation/testing/testing.go b/instrumentation/testing/testing.go index 5903a39f..7b6dd78c 100644 --- a/instrumentation/testing/testing.go +++ b/instrumentation/testing/testing.go @@ -60,59 +60,81 @@ func StartTest(t *testing.T, opts ...Option) *Test { // Starts a new test with and uses the caller pc info for Name and Suite func StartTestFromCaller(t *testing.T, pc uintptr, opts ...Option) *Test { - // Get or create a new Test struct - // If we get an old struct we replace the current span and context with a new one. - // Useful if we want to overwrite the Start call with options - test, exist := getOrCreateTest(t) - if exist { - // If there is already one we want to replace it, so we clear the context - test.ctx = context.Background() - } - test.codePC = pc - - for _, opt := range opts { - opt(test) - } - - // Extracting the testing func name (by removing any possible sub-test suffix `{test_func}/{sub_test}`) - // to search the func source code bounds and to calculate the package name. - fullTestName := runner.GetOriginalTestName(t.Name()) - pName, _, testCode := getPackageAndNameAndBoundaries(pc) - - testTags := opentracing.Tags{ - "span.kind": "test", - "test.name": fullTestName, - "test.suite": pName, - "test.framework": "testing", - "test.language": "go", - } - - if testCode != "" { - testTags["test.code"] = testCode - } - if test.ctx == nil { - test.ctx = context.Background() - } + // check if the test is cached + if isTestCached(t, pc) { - span, ctx := opentracing.StartSpanFromContextWithTracer(test.ctx, instrumentation.Tracer(), fullTestName, testTags) - span.SetBaggageItem("trace.kind", "test") + test := &Test{t: t, ctx: context.Background()} + for _, opt := range opts { + opt(test) + } - if isTestCached(t, pc) { + // Extracting the testing func name (by removing any possible sub-test suffix `{test_func}/{sub_test}`) + // to search the func source code bounds and to calculate the package name. + fullTestName := runner.GetOriginalTestName(t.Name()) + pName, _ := getPackageAndName(pc) + + testTags := opentracing.Tags{ + "span.kind": "test", + "test.name": fullTestName, + "test.suite": pName, + "test.framework": "testing", + "test.language": "go", + } + span, _ := opentracing.StartSpanFromContextWithTracer(test.ctx, instrumentation.Tracer(), fullTestName, testTags) + span.SetBaggageItem("trace.kind", "test") span.SetTag("test.status", tags.TestStatus_CACHE) span.Finish() - // Remove the Test struct from the hash map, so a call to Start while we end this instance will create a new struct - removeTest(t) t.SkipNow() - } + return test - test.span = span - test.ctx = ctx + } else { + + // Get or create a new Test struct + // If we get an old struct we replace the current span and context with a new one. + // Useful if we want to overwrite the Start call with options + test, exist := getOrCreateTest(t) + if exist { + // If there is already one we want to replace it, so we clear the context + test.ctx = context.Background() + } + test.codePC = pc + + for _, opt := range opts { + opt(test) + } + + // Extracting the testing func name (by removing any possible sub-test suffix `{test_func}/{sub_test}`) + // to search the func source code bounds and to calculate the package name. + fullTestName := runner.GetOriginalTestName(t.Name()) + pName, _, testCode := getPackageAndNameAndBoundaries(pc) + + testTags := opentracing.Tags{ + "span.kind": "test", + "test.name": fullTestName, + "test.suite": pName, + "test.framework": "testing", + "test.language": "go", + } - logging.Reset() - startCoverage() + if testCode != "" { + testTags["test.code"] = testCode + } - return test + if test.ctx == nil { + test.ctx = context.Background() + } + + span, ctx := opentracing.StartSpanFromContextWithTracer(test.ctx, instrumentation.Tracer(), fullTestName, testTags) + span.SetBaggageItem("trace.kind", "test") + test.span = span + test.ctx = ctx + + logging.Reset() + startCoverage() + + return test + } } // Set test code