Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.
7 changes: 6 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ type (

logger *log.Logger
printReportOnce sync.Once

cache *localCache
}

Option func(*Agent)
)

var (
version = "0.3.1-pre1"
version = "0.3.1-pre2"

testingModeFrequency = time.Second
nonTestingModeFrequency = time.Minute
Expand Down Expand Up @@ -376,6 +378,9 @@ func NewAgent(options ...Option) (*Agent, error) {
agent.logMetadata()
}

//
agent.cache = newLocalCache(agent.getRemoteConfigRequest(), cacheTimeout, agent.debugMode, agent.logger)

agent.recorder = NewSpanRecorder(agent)
var recorder tracer.SpanRecorder = agent.recorder
if agent.optionalRecorders != nil {
Expand Down
153 changes: 153 additions & 0 deletions agent/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package agent

import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/mitchellh/go-homedir"
)

const cacheTimeout = 1 * time.Minute

type (
localCache struct {
m sync.Mutex
tenant interface{}
basePath string
timeout time.Duration
debugMode bool
logger *log.Logger
}
cacheItem struct {
Value interface{}
}
)

// Create a new local cache
func newLocalCache(tenant interface{}, timeout time.Duration, debugMode bool, logger *log.Logger) *localCache {
lc := &localCache{
timeout: timeout,
debugMode: debugMode,
logger: logger,
}
lc.SetTenant(tenant)
return lc
}

// Gets or sets a local cache value
func (c *localCache) GetOrSet(key string, useTimeout bool, fn func(interface{}, string) interface{}) interface{} {
c.m.Lock()
defer c.m.Unlock()

// Loader function
loaderFunc := func(key string, err error, fn func(interface{}, string) interface{}) interface{} {
if err != nil {
c.logger.Printf("Local cache: %v", err)
}
if fn == nil {
return nil
}

// Call the loader
resp := fn(c.tenant, key)

if resp != nil {
path := fmt.Sprintf("%s.%s", c.basePath, key)
cItem := &cacheItem{Value: resp}
// Save a local cache for the response
if data, err := json.Marshal(cItem); err == nil {
if c.debugMode {
c.logger.Printf("Local cache saving: %s => %s", path, string(data))
}
if err := ioutil.WriteFile(path, data, 0755); err != nil {
c.logger.Printf("Error writing json file: %v", err)
}
}
}

return resp
}

path := fmt.Sprintf("%s.%s", c.basePath, key)

// We try to load the cached value
file, err := os.Open(path)
if err != nil {
return loaderFunc(key, err, fn)
}
defer file.Close()

// Checks if the cache data is old
if useTimeout {
fInfo, err := file.Stat()
if err != nil {
return loaderFunc(key, err, fn)
}
sTime := time.Now().Sub(fInfo.ModTime())
if sTime > cacheTimeout {
err = errors.New(fmt.Sprintf("The local cache key '%s' has timeout: %v", path, sTime))
return loaderFunc(key, err, fn)
}
}

// Read the cached value
fileBytes, err := ioutil.ReadAll(file)
if err != nil {
return loaderFunc(key, err, fn)
}

// Unmarshal json data
var cItem cacheItem
if err := json.Unmarshal(fileBytes, &cItem); err != nil {
return loaderFunc(key, err, fn)
} else {
c.logger.Printf("Local cache loaded: %s (%d bytes)", path, len(fileBytes))
return cItem.Value
}
}

// Sets the local cache tenant
func (c *localCache) SetTenant(tenant interface{}) {
homeDir, err := homedir.Dir()
if err != nil {
c.logger.Printf("local cache error: %v", err)
return
}
data, err := json.Marshal(tenant)
if err != nil {
c.logger.Printf("local cache error: %v", err)
return
}
hash := fmt.Sprintf("%x", sha1.Sum(data))

var folder string
if runtime.GOOS == "windows" {
folder = fmt.Sprintf("%s/AppData/Roaming/scope/cache", homeDir)
} else {
folder = fmt.Sprintf("%s/.scope/cache", homeDir)
}

if _, err := os.Stat(folder); err == nil {
c.tenant = tenant
c.basePath = filepath.Join(folder, hash)
} else if os.IsNotExist(err) {
err = os.MkdirAll(folder, 0755)
if err != nil {
c.logger.Printf("local cache error: %v", err)
return
}
c.tenant = tenant
c.basePath = filepath.Join(folder, hash)
} else {
c.logger.Printf("local cache error: %v", err)
}
}
51 changes: 51 additions & 0 deletions agent/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package agent

import (
"fmt"
"log"
"os"
"testing"
"time"
)

func getTenant() interface{} {
return map[string]string{
"key1": "value1",
"key2": fmt.Sprintf("%v", time.Now()),
}
}

func TestLocalCache(t *testing.T) {

tenant := getTenant()

cache := newLocalCache(tenant, cacheTimeout, true, log.New(os.Stdout, "", 0))
loader := false
result := cache.GetOrSet("MyKey01", false, func(i interface{}, s string) interface{} {
loader = true
return "hello world"
})

if !loader {
t.Fatal("loader has not been executed.")
}
if result.(string) != "hello world" {
t.Fatal("result was different than expected.")
}

cache2 := newLocalCache(tenant, cacheTimeout, true, log.New(os.Stdout, "", 0))
loader = false
for i := 0; i < 10; i++ {
result = cache2.GetOrSet("MyKey01", false, func(i interface{}, s string) interface{} {
loader = true
return "hello world"
})

if loader {
t.Fatal("loader has been executed.")
}
if result.(string) != "hello world" {
t.Fatal("result was different than expected.")
}
}
}
17 changes: 11 additions & 6 deletions agent/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
const (
server = "pool.ntp.org"
retries = 5
timeout = 1 * time.Second
timeout = 2 * time.Second
backoff = 1 * time.Second
)

Expand Down Expand Up @@ -38,12 +38,17 @@ func (r *SpanRecorder) applyNTPOffset(t time.Time) time.Time {
if r.debugMode {
r.logger.Println("calculating ntp offset.")
}
offset, err := getNTPOffset()
if err == nil {
ntpOffset = offset
offset := r.cache.GetOrSet("ntp", true, func(d interface{}, s string) interface{} {
oSet, err := getNTPOffset()
if err != nil {
r.logger.Printf("error calculating the ntp offset: %v\n", err)
return nil
}
return float64(oSet)
})
if offset != nil {
ntpOffset = time.Duration(offset.(float64))
r.logger.Printf("ntp offset: %v\n", ntpOffset)
} else {
r.logger.Printf("error calculating the ntp offset: %v\n", err)
}
})
return t.Add(ntpOffset)
Expand Down
2 changes: 2 additions & 0 deletions agent/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
logger *log.Logger
stats *RecorderStats
statsOnce sync.Once
cache *localCache
}
RecorderStats struct {
totalSpans int64
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
r.debugMode = agent.debugMode
r.metadata = agent.metadata
r.logger = agent.logger
r.cache = agent.cache
r.flushFrequency = agent.flushFrequency
r.url = agent.getUrl("api/agent/ingest")
r.client = &http.Client{}
Expand Down
Loading