Skip to content

Commit

Permalink
Switch to slog for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jonyoder committed May 23, 2024
1 parent 4d0dcd0 commit 9b3d440
Show file tree
Hide file tree
Showing 53 changed files with 435 additions and 858 deletions.
8 changes: 3 additions & 5 deletions examples/cmd/markdownRenderer/handlers/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@ import (
"context"
"fmt"
"io/ioutil"
"log/slog"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/rstudio/platform-lib/examples/cmd/markdownRenderer/runners"
"github.com/rstudio/platform-lib/pkg/rscache"
"github.com/rstudio/platform-lib/pkg/rslog"
)

type HttpHandler struct {
debug rslog.DebugLogger
router *mux.Router
address string
cache rscache.FileCache
}

func NewHttpHandler(debug rslog.DebugLogger, address string, router *mux.Router, cache rscache.FileCache) *HttpHandler {
func NewHttpHandler(address string, router *mux.Router, cache rscache.FileCache) *HttpHandler {
return &HttpHandler{
debug: debug,
router: router,
address: address,
cache: cache,
Expand Down Expand Up @@ -53,7 +51,7 @@ func (h *HttpHandler) Start(ctx context.Context) {
// Create a context with a 30-second timeout, and attempt to shut down HTTP services.
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
h.debug.Debugf("Shutting down HTTP services.")
slog.Debug("Shutting down HTTP services.")
srv.Shutdown(shutdownCtx)
}

Expand Down
27 changes: 3 additions & 24 deletions examples/cmd/markdownRenderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,10 @@ func main() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)

// Set up debug loggers. The first one, `debugLogger` is used by
// main.go and the http handlers. The remaining loggers are used
// by the `platform-lib` third-party libraries.
debugLogger := rslog.NewDebugLogger(RegionRenderer)
storeLogger := rslog.NewDebugLogger(RegionStore)
storageLogger := rslog.NewDebugLogger(RegionStorage)
notifyLogger := rslog.NewDebugLogger(RegionNotifications)
queueLogger := rslog.NewDebugLogger(RegionQueue)
cacheLogger := rslog.NewDebugLogger(RegionCache)
cacheNfsTimeLogger := rslog.NewDebugLogger(RegionCacheNfsTime)
agentLogger := rslog.NewDebugLogger(RegionAgent)
agentTraceLogger := rslog.NewDebugLogger(RegionAgentTrace)
agentJobLogger := rslog.NewDebugLogger(RegionAgentJob)

// Start a new local listener provider and factory. The provider knows how
// to create a new listener. The factory uses the provider to create new
// listeners when needed.
localListenerProvider := local.NewListenerProvider(local.ListenerProviderArgs{DebugLogger: notifyLogger})
localListenerProvider := local.NewListenerProvider(local.ListenerProviderArgs{})
localListenerFactory := local.NewListenerFactory(localListenerProvider)
defer localListenerFactory.Shutdown()

Expand Down Expand Up @@ -151,7 +137,7 @@ func main() {
// and is used by the database queue implementation. The store also includes the
// local listener provider to support sending notifications.
_ = os.Mkdir("data", 0755) // Create a data directory
exampleStore := store.Open("data/markdownRenderer.sqlite", localListenerProvider, storeLogger)
exampleStore := store.Open("data/markdownRenderer.sqlite", localListenerProvider)

// The file storage server supports chunked files, where large files are stored in a
// directory in "chunks" not larger than a configured size. The chunk waiter is used
Expand All @@ -169,7 +155,6 @@ func main() {
Waiter: waiter,
Notifier: notifier,
Class: "rendered",
DebugLogger: storageLogger,
CacheTimeout: storageCacheTimeout,
WalkTimeout: storageWalkTimeout,
})
Expand Down Expand Up @@ -205,7 +190,6 @@ func main() {
NotifyTypeWorkComplete: notifytypes.NotifyTypeWorkComplete,
NotifyTypeChunk: notifytypes.NotifyTypeChunk,
ChunkMatcher: chunkMatcher,
DebugLogger: queueLogger,
CarrierFactory: &metrics.EmptyCarrierFactory{},
QueueStore: exampleStore,
QueueMsgsChan: queueMessages,
Expand Down Expand Up @@ -236,8 +220,6 @@ func main() {
StorageServer: fileStorage,
Recurser: recurser,
Timeout: fileCacheTimeout,
DebugLogger: cacheLogger,
NfsTimeLogger: cacheNfsTimeLogger,
})

// Track supported types for the queue. This is useful for entering an offline mode,
Expand Down Expand Up @@ -286,9 +268,6 @@ func main() {
ConcurrencyEnforcer: cEnforcer,
SupportedTypes: supportedTypes,
NotificationsChan: queueMessages,
DebugLogger: agentLogger,
TraceLogger: agentTraceLogger,
JobLogger: agentJobLogger,
NotifyTypeWorkComplete: notifytypes.NotifyTypeWorkComplete,
JobLifecycleWrapper: &metrics.EmptyJobLifecycleWrapper{},
}
Expand All @@ -306,7 +285,7 @@ func main() {

// Start HTTP services and listen until the application exits.
router := mux.NewRouter()
handler := handlers.NewHttpHandler(debugLogger, address, router, cache)
handler := handlers.NewHttpHandler(address, router, cache)
ctx, cancel := context.WithCancel(context.Background())
go handler.Start(ctx)
// Cancel the handler's context when the application exits for graceful
Expand Down
9 changes: 3 additions & 6 deletions examples/cmd/markdownRenderer/store/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/mattn/go-sqlite3"
"github.com/rstudio/platform-lib/examples/cmd/markdownRenderer/notifytypes"
"github.com/rstudio/platform-lib/pkg/rslog"
"github.com/rstudio/platform-lib/pkg/rsnotify/listener"
"github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local"
"github.com/rstudio/platform-lib/pkg/rsnotify/listenerutils"
Expand Down Expand Up @@ -108,15 +108,14 @@ type queuedNotification struct {
type store struct {
db *gorm.DB
inTransaction bool
logger rslog.DebugLogger

// For local notification queuing in a transaction
mutex sync.Mutex
notifications []queuedNotification
llFactory *local.ListenerProvider
}

func Open(path string, llf *local.ListenerProvider, logger rslog.DebugLogger) Store {
func Open(path string, llf *local.ListenerProvider) Store {
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
panic("failed to connect database")
Expand All @@ -131,7 +130,6 @@ func Open(path string, llf *local.ListenerProvider, logger rslog.DebugLogger) St
return &store{
db: db,
llFactory: llf,
logger: logger,
}
}

Expand All @@ -141,7 +139,6 @@ func (conn *store) BeginTransaction(description string) (Store, error) {
inTransaction: true,
notifications: make([]queuedNotification, 0),
llFactory: conn.llFactory,
logger: conn.logger,
}, nil
}

Expand All @@ -159,7 +156,7 @@ func (conn *store) CompleteTransaction(err *error) {
// where queued during the transaction.
if finErr == nil && conn.llFactory != nil {
for _, n := range conn.notifications {
conn.logger.Debugf("Notifying %s of available work: %#v", n.channel, n.n)
slog.Debug(fmt.Sprintf("Notifying %s of available work: %#v", n.channel, n.n))
conn.llFactory.Notify(n.channel, n.n)
}
}
Expand Down
10 changes: 2 additions & 8 deletions examples/cmd/markdownRenderer/store/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os"
"testing"

"github.com/rstudio/platform-lib/pkg/rslog"
"github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local"
"github.com/rstudio/platform-lib/pkg/rsqueue/permit"
"github.com/rstudio/platform-lib/pkg/rsqueue/queue"
Expand All @@ -31,20 +30,15 @@ type QueueSqliteSuite struct {
tmp string
}

type fakeDebugLogger struct{}

func (*fakeDebugLogger) Debugf(msg string, args ...interface{}) {}
func (*fakeDebugLogger) Enabled() bool { return false }

var _ = check.Suite(&QueueSqliteSuite{})

func (s *QueueSqliteSuite) SetUpTest(c *check.C) {
tmp, err := ioutil.TempFile("", "")
c.Assert(err, check.IsNil)
s.tmp = tmp.Name()

llf := local.NewListenerProvider(local.ListenerProviderArgs{DebugLogger: &fakeDebugLogger{}})
s.store = Open(s.tmp, llf, rslog.NewDebugLogger(0))
llf := local.NewListenerProvider(local.ListenerProviderArgs{})
s.store = Open(s.tmp, llf)
}

func (s *QueueSqliteSuite) TearDownTest(c *check.C) {
Expand Down
4 changes: 2 additions & 2 deletions examples/cmd/testnotify/cmd/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func setup(drv string) (string, listenerfactory.ListenerFactory, *local.Listener
return drv, nil, nil, fmt.Errorf("error connecting to pool: %s", err)
}
ipReporter := postgrespgx.NewPgxIPReporter(pool)
fact = postgrespgx.NewListenerFactory(postgrespgx.ListenerFactoryArgs{Pool: pool, DebugLogger: debugLogger, IpReporter: ipReporter})
fact = postgrespgx.NewListenerFactory(postgrespgx.ListenerFactoryArgs{Pool: pool, IpReporter: ipReporter})
case "pq":
drv = "postgres"
pqListenerFactory := &pqlistener{ConnStr: connstr}
fact = postgrespq.NewListenerFactory(postgrespq.ListenerFactoryArgs{Factory: pqListenerFactory, DebugLogger: debugLogger})
fact = postgrespq.NewListenerFactory(postgrespq.ListenerFactoryArgs{Factory: pqListenerFactory})
default:
return drv, nil, nil, fmt.Errorf("invalid --driver argument value")
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/rscache/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import (
"context"
"fmt"
"io"
"log/slog"
"time"

"github.com/rstudio/platform-lib/pkg/rsstorage"
"github.com/rstudio/platform-lib/pkg/rsstorage/types"
)

type DebugLogger interface {
Debugf(msg string, args ...interface{})
Enabled() bool
}

type QueueWork interface {
Type() uint64
}
Expand Down Expand Up @@ -52,8 +48,6 @@ type FileCacheConfig struct {
StorageServer rsstorage.StorageServer
Recurser OptionalRecurser
Timeout time.Duration
DebugLogger DebugLogger
NfsTimeLogger DebugLogger
}

func NewFileCache(cfg FileCacheConfig) FileCache {
Expand All @@ -63,8 +57,6 @@ func NewFileCache(cfg FileCacheConfig) FileCache {
server: cfg.StorageServer,
timeout: cfg.Timeout,
recurser: cfg.Recurser,
debugLogger: cfg.DebugLogger,
nfsTimeLogger: cfg.NfsTimeLogger,

retry: time.Millisecond * 200,
}
Expand All @@ -87,10 +79,6 @@ type fileCache struct {

// retry delay
retry time.Duration

// Loggers
debugLogger DebugLogger
nfsTimeLogger DebugLogger
}

func (o *fileCache) retryingGet(dir, address string, get func() bool) bool {
Expand All @@ -114,9 +102,9 @@ func (o *fileCache) retryingGet(dir, address string, get func() bool) bool {
// Preemptive get attempt
if flushingGet() {
if flushed == 0 {
o.nfsTimeLogger.Debugf("Found cached item at address '%s' immediately", address)
slog.Log(context.Background(), LevelTrace, fmt.Sprintf("Found cached item at address '%s' immediately", address))
} else {
o.nfsTimeLogger.Debugf("Found cached item at address '%s' after one flush", address)
slog.Log(context.Background(), LevelTrace, fmt.Sprintf("Found cached item at address '%s' after one flush", address))
}
return true
}
Expand All @@ -130,7 +118,7 @@ func (o *fileCache) retryingGet(dir, address string, get func() bool) bool {
case <-retry.C:
if flushingGet() {
elapsed := time.Now().Sub(start) / 1000000
o.nfsTimeLogger.Debugf("Found cached item at address '%s' after %d ms and %d flushes", address, elapsed, flushed)
slog.Log(context.Background(), LevelTrace, fmt.Sprintf("Found cached item at address '%s' after %d ms and %d flushes", address, elapsed, flushed))
return true
}
case <-timeout.C:
Expand Down Expand Up @@ -187,7 +175,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
err = o.queue.AddressedPush(resolver.Priority, resolver.GroupId, resolver.Address(), resolver.Work)
if o.duplicateMatcher.IsDuplicate(err) {
// Do nothing since; someone else has already inserted the work we need.
o.debugLogger.Debugf("FileCache: duplicate address push for '%s'", resolver.Address())
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", resolver.Address()))
} else if err != nil {
return
}
Expand All @@ -205,7 +193,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
if o.retryingGet(resolver.Dir(), resolver.Address(), head) {
return
} else {
o.debugLogger.Debugf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address())
slog.Debug(fmt.Sprintf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address()))
err = fmt.Errorf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address())
return
}
Expand Down Expand Up @@ -274,7 +262,7 @@ func (o *fileCache) Get(ctx context.Context, resolver ResolverSpec) (value *Cach
err = o.queue.AddressedPush(resolver.Priority, resolver.GroupId, address, resolver.Work)
if o.duplicateMatcher.IsDuplicate(err) {
// Do nothing since; someone else has already inserted the work we need.
o.debugLogger.Debugf("FileCache: duplicate address push for '%s'", address)
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", address))
} else if err != nil {
value = &CacheReturn{
Err: err,
Expand All @@ -297,7 +285,7 @@ func (o *fileCache) Get(ctx context.Context, resolver ResolverSpec) (value *Cach
return
} else {
err = fmt.Errorf("error: FileCache reported address '%s' complete, but item was not found in cache", address)
o.debugLogger.Debugf(err.Error())
slog.Debug(fmt.Sprintf(err.Error()))
return
}
}
Expand Down
Loading

0 comments on commit 9b3d440

Please sign in to comment.