Skip to content

Commit

Permalink
Delay logs stream creation until after cluster is stable
Browse files Browse the repository at this point in the history
  • Loading branch information
OrMemphis committed Sep 19, 2022
1 parent 8160a45 commit 9ccda5f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
5 changes: 1 addition & 4 deletions main.go
Expand Up @@ -105,10 +105,6 @@ func runMemphis(s *server.Server) db.DbInstance {
s.Fatalf("Jetstream not enabled on global account")
}

if err := s.CreateSystemLogsStream(); err != nil {
s.Fatalf("Failed to create syslogs stream: " + " " + err.Error())
}

dbInstance, err := db.InitializeDbConnection(s)
if err != nil {
s.Errorf("Failed initializing db connection:" + " " + err.Error())
Expand All @@ -121,6 +117,7 @@ func runMemphis(s *server.Server) db.DbInstance {
}

s.InitializeMemphisHandlers(dbInstance)
go s.CreateSystemLogsStream()

err = server.CreateRootUserOnFirstSystemLoad()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/jetstream_cluster.go
Expand Up @@ -953,6 +953,7 @@ func (js *jetStream) monitorCluster() {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
}
js.processLeaderChange(isLeader)
js.srv.memphisClusterReady()
if isLeader && !beenLeader {
beenLeader = true
if n.NeedSnapshot() {
Expand Down
6 changes: 6 additions & 0 deletions server/memphis_handlers.go
Expand Up @@ -67,6 +67,9 @@ type srvMemphis struct {
dbCtx context.Context
dbCancel context.CancelFunc
activateSysLogsPubFunc func()
mcrReported bool
mcr chan struct{} // memphis cluster ready
logStreamCreated bool
}

func (s *Server) InitializeMemphisHandlers(dbInstance db.DbInstance) {
Expand All @@ -76,6 +79,9 @@ func (s *Server) InitializeMemphisHandlers(dbInstance db.DbInstance) {
s.memphis.dbCancel = dbInstance.Cancel
s.memphis.nuid = nuid.New()
s.memphis.serverID = configuration.SERVER_NAME
s.memphis.mcrReported = false
s.memphis.mcr = make(chan struct{})
s.memphis.logStreamCreated = false

usersCollection = db.GetCollection("users", dbInstance.Client)
imagesCollection = db.GetCollection("images", dbInstance.Client)
Expand Down
42 changes: 34 additions & 8 deletions server/memphis_helper.go
Expand Up @@ -173,10 +173,36 @@ const (
syslogsErrSubject = "err"
)

func (s *Server) CreateSystemLogsStream() error {
func (s *Server) memphisClusterReady() {
if !s.memphis.mcrReported {
s.memphis.mcrReported = true
close(s.memphis.mcr)
}
}

func (s *Server) CreateSystemLogsStream() {
if s.JetStreamIsClustered() {
if !s.memphis.logStreamCreated {
timeout := time.NewTimer(2 * time.Minute)
select {
case <-timeout.C:
s.Fatalf("Failed to create syslogs stream: cluster readiness timeout")
case <-s.memphis.mcr:
timeout.Stop()
s.memphis.logStreamCreated = true
}

if !s.JetStreamIsLeader() {
return
}
}

}

retentionDays, err := strconv.Atoi(configuration.LOGS_RETENTION_IN_DAYS)
if err != nil {
return err
s.Fatalf("Failed to create syslogs stream: " + " " + err.Error())

}
retentionDur := time.Duration(retentionDays) * time.Hour * 24

Expand All @@ -190,14 +216,14 @@ func (s *Server) CreateSystemLogsStream() error {
Storage: FileStorage,
})

if err == nil {
if s.memphis.activateSysLogsPubFunc == nil {
return errors.New("publish activation func is not initialised")
}
s.memphis.activateSysLogsPubFunc()
if err != nil {
s.Fatalf("Failed to create syslogs stream: " + " " + err.Error())
}

return err
if s.memphis.activateSysLogsPubFunc == nil {
s.Fatalf("publish activation func is not initialised")
}
s.memphis.activateSysLogsPubFunc()
}

func (s *Server) memphisAddStream(sc *StreamConfig) error {
Expand Down

0 comments on commit 9ccda5f

Please sign in to comment.