Skip to content

Commit

Permalink
storage: close db after writing (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
1023280072 committed Sep 6, 2023
1 parent c24047f commit 55d4162
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 176 deletions.
16 changes: 8 additions & 8 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ var startCmd = &cobra.Command{
QuorumPort: viper.GetInt("quorum-port"),
DataStoreConfig: tcpmon.DataStoreConfig{
Path: viper.GetString("db"),
MaxSize: viper.GetInt("db-max-size"),
GcInterval: viper.GetDuration("db-gc-interval"),
ReclaimBatch: viper.GetInt("reclaim-batch"),
ReclaimInterval: viper.GetDuration("reclaim-interval"),
MaxSize: viper.GetUint32("db-max-size"),
WriteInterval: viper.GetDuration("db-write-interval"),
ExpectedRss: viper.GetUint64("db-expected-rss"),
MinOpenInterval: viper.GetDuration("db-min-open-interval"),
},
})
if err != nil {
Expand Down Expand Up @@ -80,10 +80,10 @@ func init() {

// db flags
startCmd.PersistentFlags().String("db", "/tmp/tcpmon/db", "Database path")
startCmd.PersistentFlags().Int("db-max-size", 30000, "Maximum number of records in the database")
startCmd.PersistentFlags().Duration("db-gc-interval", 10*time.Minute, "BadgerDB value GC interval")
startCmd.PersistentFlags().Int("reclaim-batch", 2000, "Maximum number of reclaiming per batch")
startCmd.PersistentFlags().Duration("reclaim-interval", 3*time.Minute, "Reclaiming interval")
startCmd.PersistentFlags().Uint32("db-max-size", 2000000, "Maximum number of records in the database")
startCmd.PersistentFlags().Duration("db-write-interval", 3*time.Second, "Write interval")
startCmd.PersistentFlags().Uint64("db-expected-rss", 200<<20, "Expected maximum RSS")
startCmd.PersistentFlags().Duration("db-min-open-interval", 1*time.Minute, "Database reopen interval")

fatalIf(viper.BindPFlags(startCmd.PersistentFlags()))
rootCmd.AddCommand(startCmd)
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package main

import (
"os"
"runtime/debug"
"strings"

"github.com/zperf/tcpmon/cmd"
)

func main() {
debug.SetMemoryLimit(100 << 20)
cmd.Execute(strings.Join(os.Args[1:], " "))
}
1 change: 0 additions & 1 deletion rpm/tcpmon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ cmd-ss2: /usr/sbin/ss
cmd-timeout: 1s
collect-interval: 1s
db: /var/lib/tcpmon
db-gc-interval: 10m0s
db-max-size: 2000000
disable-console: true
listen: 0.0.0.0:6789
Expand Down
11 changes: 2 additions & 9 deletions tcpmon/http_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,10 @@ import (

func GetMember(q *Quorum) func(c *gin.Context) {
return func(c *gin.Context) {
members := make(map[string]any)
members := make([]string, 0)
for _, member := range q.Members() {
addr := member.Address()
minfo, err := q.GetMemberMeta(addr)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, ErrorJSON(err))
return
}
members[member.Address()] = minfo
members = append(members, member.Address())
}

c.JSON(http.StatusOK, gin.H{
"len": len(members),
"members": members,
Expand Down
4 changes: 2 additions & 2 deletions tcpmon/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

func RegisterRoutes(router *gin.Engine, mon *Monitor) {
router.GET("/metrics", GetMetrics(mon.datastore))
router.GET("/metrics/*name", GetMetric(mon.datastore))
// router.GET("/metrics", GetMetrics(mon.datastore))
// router.GET("/metrics/*name", GetMetric(mon.datastore))
router.GET("/members", GetMember(mon.quorum))
router.POST("/members", JoinCluster(mon.quorum))
router.POST("/members/leave", LeaveCluster(mon.quorum))
Expand Down
12 changes: 6 additions & 6 deletions tcpmon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func (m *Monitor) Run(ctx context.Context) error {

m.startHttpServer(m.config.HttpListen)

initialMembers, err := m.datastore.GetMemberAddressList()
if err != nil {
log.Info().Err(err).Msg("Get members from db failed")
} else if len(initialMembers) != 0 {
m.quorum.Join(initialMembers)
}
// initialMembers, err := m.datastore.GetMemberAddressList()
// if err != nil {
// log.Info().Err(err).Msg("Get members from db failed")
// } else if len(initialMembers) != 0 {
// m.quorum.Join(initialMembers)
// }

for {
select {
Expand Down
32 changes: 16 additions & 16 deletions tcpmon/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func NewQuorum(ds *DataStore, mconfig *MonitorConfig) *Quorum {
log.Fatal().Err(err).Msg("Marshal member info failed")
}
my.Meta = buf
err = ds.UpdateMember(my.Address(), buf)
if err != nil {
log.Fatal().Err(err).Msg("Update my member info failed")
}
// err = ds.UpdateMember(my.Address(), buf)
// if err != nil {
// log.Fatal().Err(err).Msg("Update my member info failed")
// }

return q
}
Expand All @@ -64,26 +64,26 @@ func (q *Quorum) Close() {

func (q *Quorum) NotifyJoin(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Member joined")
err := q.ds.AddMember(node.Address(), node.Meta)
if err != nil {
log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
}
// err := q.ds.AddMember(node.Address(), node.Meta)
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
// }
}

func (q *Quorum) NotifyLeave(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Member left")
err := q.ds.DeleteMember(node.Address())
if err != nil {
log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
}
// err := q.ds.DeleteMember(node.Address())
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Save member failed")
// }
}

func (q *Quorum) NotifyUpdate(node *memberlist.Node) {
log.Info().Str("node", node.Address()).Msgf("Update member meta data")
err := q.ds.UpdateMember(node.Address(), node.Meta)
if err != nil {
log.Warn().Err(err).Str("node", node.Address()).Msg("Update member failed")
}
// err := q.ds.UpdateMember(node.Address(), node.Meta)
// if err != nil {
// log.Warn().Err(err).Str("node", node.Address()).Msg("Update member failed")
// }
}

func (q *Quorum) Local() *memberlist.Node {
Expand Down
Loading

0 comments on commit 55d4162

Please sign in to comment.