Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding pprof flag to support better tracing & rewriting file watching #97

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var SaveSnapshotFlag = flag.Bool("save-snapshot", false, "Only take snapshot and
var ClusterAddrFlag = flag.String("cluster-addr", "", "Cluster listening address")
var ClusterPeersFlag = flag.String("cluster-peers", "", "Comma separated list of clusters")
var LeafServerFlag = flag.String("leaf-servers", "", "Comma separated list of leaf servers")
var ProfServer = flag.String("pprof", "", "PProf listening address")

var DataRootDir = os.TempDir()
var Config = &Configuration{
Expand Down
28 changes: 24 additions & 4 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,34 +253,54 @@ func (conn *SqliteStreamDB) initTriggers(tableName string) error {
return nil
}

func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher *fsnotify.Watcher) {
for {
select {
case ev, ok := <-watcher.Events:
if !ok {
close(changed)
return
}

if ev.Op == fsnotify.Chmod {
continue
}

changed <- ev
}
}
}

func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) {
shmPath := path + "-shm"
walPath := path + "-wal"

errDB := watcher.Add(path)
errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
dbChanged := make(chan fsnotify.Event)

tickerDur := time.Duration(cfg.Config.PollingInterval) * time.Millisecond
changeLogTicker := utils.NewTimeoutPublisher(tickerDur)

// Publish change logs for any residual change logs before starting watcher
conn.publishChangeLog()
go conn.filterChangesTo(dbChanged, watcher)

for {
changeLogTicker.Reset()

err := conn.WithReadTx(func(_tx *sql.Tx) error {
select {
case ev, ok := <-watcher.Events:
case ev, ok := <-dbChanged:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point of adding another channel and doing the same thing that we are doing in the body of the function. This one new channel + go routine might not be a big deal, but unless there is sophisticated logic adding such additional go routines and channels is just gonna keep on increasing memory overhead.

if !ok {
return ErrEndOfWatch
}

if ev.Op != fsnotify.Chmod {
conn.publishChangeLog()
}
log.Debug().Int("change", int(ev.Op)).Msg("Change detected")
conn.publishChangeLog()
case <-changeLogTicker.Channel():
log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout")
conn.publishChangeLog()
}

Expand Down
20 changes: 19 additions & 1 deletion marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"flag"
"io"
"net/http"
"net/http/pprof"
_ "net/http/pprof"
"os"
"time"

Expand All @@ -22,7 +25,6 @@ import (

func main() {
flag.Parse()

err := cfg.Load(*cfg.ConfigPathFlag)
if err != nil {
panic(err)
Expand All @@ -44,6 +46,22 @@ func main() {
log.Logger = gLog.Level(zerolog.InfoLevel)
}

if *cfg.ProfServer != "" {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
maxpert marked this conversation as resolved.
Show resolved Hide resolved
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

err := http.ListenAndServe(*cfg.ProfServer, mux)
if err != nil {
log.Error().Err(err).Msg("unable to bind profiler server")
}
}()
}

log.Debug().Msg("Initializing telemetry")
telemetry.InitializeTelemetry()

Expand Down
Loading