Skip to content

Commit

Permalink
Merge pull request #259 from mimiro-io/feat/slow-log
Browse files Browse the repository at this point in the history
Feat/slow log
  • Loading branch information
gra-moore committed Sep 19, 2023
2 parents 759173c + 734a9ed commit 56d3903
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 55 deletions.
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
FROM golang:1.21 as builder_src

COPY jemalloc-install.sh .

RUN apt-get update -y
RUN apt-get install bzip2 -y
RUN bash jemalloc-install.sh
Expand All @@ -17,6 +16,7 @@ COPY go.mod go.sum ./
# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed
RUN go mod download

RUN go install github.com/go-delve/delve/cmd/dlv@latest
# Copy the source from the current directory to the Working Directory inside the container
COPY cmd ./cmd
COPY internal ./internal
Expand All @@ -41,10 +41,11 @@ WORKDIR /root/

COPY --from=builder /app/server .
COPY --from=builder /app/server-gogc .
COPY --from=builder /go/bin/dlv .

# Expose port 8080 to the outside world
EXPOSE 8080
# Expose port 8080 to the outside world, also 40000 for delve
EXPOSE 8080 40000

ENV GOMAXPROCS=128

CMD ["./server"]
CMD ["./server"]
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/dop251/goja => github.com/mimiro-io/goja v1.0.0
require (
github.com/DataDog/datadog-go/v5 v5.3.0
github.com/bamzi/jobrunner v1.0.0
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v1.0.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gojektech/heimdall/v6 v6.1.0
Expand All @@ -32,12 +32,10 @@ require (

require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
44 changes: 0 additions & 44 deletions go.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func loadEnv(basePath *string, loadFromHome bool) (*Env, error) {
PoolFull: viper.GetInt("JOBS_MAX_FULLSYNC"),
Concurrent: 1,
},
SlowLogThreshold: viper.GetDuration("SLOW_LOG_THRESHOLD"),
}, nil
}

Expand Down Expand Up @@ -144,6 +145,7 @@ func parseEnv(basePath *string, logger *zap.SugaredLogger, loadFromHome bool) er
viper.SetDefault("SECURITY_STORAGE_LOCATION", fmt.Sprintf("%s/%s", home, "datahubsecurity"))
viper.SetDefault("JOBS_MAX_INCREMENTAL", 10)
viper.SetDefault("JOBS_MAX_FULLSYNC", 10)
viper.SetDefault("SLOW_LOG_THRESHOLD", "1s")
viper.AutomaticEnv()

viper.SetConfigType("env")
Expand All @@ -166,4 +168,4 @@ func parseEnv(basePath *string, logger *zap.SugaredLogger, loadFromHome bool) er
}

return nil
}
}
3 changes: 2 additions & 1 deletion internal/conf/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Env struct {
SecurityStorageLocation string
BackupSourceLocation string
RunnerConfig *RunnerConfig
SlowLogThreshold time.Duration
}

type AuthConfig struct {
Expand All @@ -67,4 +68,4 @@ type RunnerConfig struct {
PoolIncremental int
PoolFull int
Concurrent int
}
}
44 changes: 44 additions & 0 deletions internal/server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,47 @@ func uint16ToBytes(i CollectionIndex) []byte {
binary.BigEndian.PutUint16(indexBytes, uint16(i))
return indexBytes
}

func collectionToStr(idx uint16) string {
switch idx {
case URIToIDIndexID:
return "URIToIDIndexID"
case EntityIDToJSONIndexID:
return "EntityIDToJSONIndexID"
case IncomingRefIndex:
return "IncomingRefIndex"
case OutgoingRefIndex:
return "OutgoingRefIndex"
case DatasetEntityChangeLog:
return "DatasetEntityChangeLog"
case SysDatasetsID:
return "SysDatasetsID"
case SysJobsID:
return "SysJobsID"
case SysDatasetsSequences:
return "SysDatasetsSequences"
case DatasetLatestEntities:
return "DatasetLatestEntities"
case IDToURIIndexID:
return "IDToURIIndexID"
case uint16(StoreMetaIndex):
return "StoreMetaIndex"
case uint16(NamespacesIndex):
return "NamespacesIndex"
case uint16(JobResultIndex):
return "JobResultIndex"
case uint16(JobDataIndex):
return "JobDataIndex"
case uint16(JobConfigIndex):
return "JobConfigIndex"
case uint16(ContentIndex):
return "ContentIndex"
case uint16(StoreNextDatasetID):
return "StoreNextDatasetID"
case uint16(LoginProviderIndex):
return "LoginProviderIndex"
default:
return "unknown"
}

}
3 changes: 2 additions & 1 deletion internal/server/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ func (ds *Dataset) updateDataset(newItemCount int64, entities []*Entity) error {
func (ds *Dataset) GetChangesWatermark() (uint64, error) {
var waterMark uint64

err := ds.store.database.View(func(txn *badger.Txn) error {
err := ds.store.database.View(func(btxn *badger.Txn) error {
txn := InstrumentedTxn(btxn, ds.store)
searchBuffer := make([]byte, 7)
binary.BigEndian.PutUint16(searchBuffer, DatasetEntityChangeLog)
binary.BigEndian.PutUint32(searchBuffer[2:], ds.InternalID)
Expand Down
124 changes: 124 additions & 0 deletions internal/server/instrumentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package server

import (
"encoding/binary"
"fmt"
"reflect"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v4"
"go.uber.org/zap"
)

type InstrumentedTransaction struct {
txn *badger.Txn
logger *zap.SugaredLogger
store *Store
}

type InstumentedIterator struct {
iter *badger.Iterator
options badger.IteratorOptions
txn *InstrumentedTransaction
itemCalls atomic.Int64
}

func (i *InstumentedIterator) Close() {
i.slowLog(i.iter.Close)
}

func (i *InstumentedIterator) Rewind() {
i.slowLog(i.iter.Rewind)
}

func (i *InstumentedIterator) Item() *badger.Item {
defer i.itemCalls.Add(1)
return slowLogAndReturn(func() *badger.Item { return i.iter.Item() }, i)
}

func (t *InstrumentedTransaction) NewIterator(options badger.IteratorOptions) *InstumentedIterator {
it := &InstumentedIterator{t.txn.NewIterator(options), options, t, atomic.Int64{}}
return slowLogAndReturn(func() *InstumentedIterator {
return it
}, it)
}

func InstrumentedTxn(btxn *badger.Txn, store *Store) *InstrumentedTransaction {
return &InstrumentedTransaction{txn: btxn, logger: store.logger, store: store}
}

// //////////////////// instrumentation details /////////////////////////
func (i *InstumentedIterator) slowLog(c func()) {
t := time.Now()
c()
elapsed := time.Since(t)
if elapsed > i.txn.store.slowLogThreshold {
// log slow call
shortName, structLogPairs := callInfo(i, reflect.ValueOf(c).Pointer())
structLogPairs = append(structLogPairs, "itemCalls", i.itemCalls.Load())
i.txn.logger.Infow(fmt.Sprintf("slow badger call: %v , elapsed: %v", shortName, elapsed), structLogPairs...)
}
}

func slowLogAndReturn[T any](c func() T, i Instrumented) T {
t := time.Now()
result := c()
elapsed := time.Since(t)
if elapsed > i.getStore().slowLogThreshold {
// log slow call
fptr, _, _, _ := runtime.Caller(1)
shortName, structLogPairs := callInfo(i, fptr)
i.getStore().logger.Infow(fmt.Sprintf("slow badger call: %v , elapsed: %v", shortName, elapsed), structLogPairs...)
}
return result
}

type Instrumented interface {
callParams() string
getStore() *Store
}

func (i *InstumentedIterator) getStore() *Store {
return i.txn.store
}

func callInfo(i Instrumented, cptr uintptr) (string, []any) {
f := runtime.FuncForPC(cptr)
fName := f.Name()
tokens := strings.Split(fName, ".")
shortName := tokens[len(tokens)-1]
if len(tokens) > 1 {
shortName = tokens[len(tokens)-2] + "." + tokens[len(tokens)-1]
}

_, file, line, _ := runtime.Caller(3)

return shortName, []any{"function", fName, "params", i.callParams(), "file", file, "line", line}
}

func (i *InstumentedIterator) callParams() string {
return fmt.Sprintf("[prefixDetais:%v, options:%+v ]", i.prefixDetails(i.options.Prefix), i.options)
}

func (i *InstumentedIterator) prefixDetails(prefix []byte) string {
res := "{"
if len(prefix) > 0 {
res += fmt.Sprintf("collection: %v,", collectionToStr(binary.BigEndian.Uint16(prefix)))
}
if len(prefix) > 2 {
internalDsId := binary.BigEndian.Uint32(prefix[2:])
i.txn.store.datasets.Range(func(_ interface{}, v interface{}) bool {
ds := v.(*Dataset)
if ds.InternalID == internalDsId {
res += fmt.Sprintf("dataset: %v,", ds.ID)
return false
}
return true
})
}
res += "}"
return res
}
2 changes: 2 additions & 0 deletions internal/server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Store struct {
blockCacheSize int64
valueLogFileSize int64
maxCompactionLevels int
slowLogThreshold time.Duration
}

type BadgerLogger struct { // we use this to implement the Badger Logger interface
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewStore(lc fx.Lifecycle, env *conf.Env, statsdClient statsd.ClientInterfac
blockCacheSize: env.BlockCacheSize,
valueLogFileSize: env.ValueLogFileSize,
maxCompactionLevels: env.MaxCompactionLevels,
slowLogThreshold: env.SlowLogThreshold,
}
store.NamespaceManager = NewNamespaceManager(store)

Expand Down

0 comments on commit 56d3903

Please sign in to comment.