diff --git a/main.go b/main.go index 347cdd3ef..b7dde6586 100644 --- a/main.go +++ b/main.go @@ -105,10 +105,6 @@ func runMemphis(s *server.Server) db.DbInstance { s.Fatalf("Jetstream not enabled on global account") } - if err := s.CreateSystemLogsStream(); err != nil { - s.Fatalf("Failed to create syslogs stream: " + " " + err.Error()) - } - dbInstance, err := db.InitializeDbConnection(s) if err != nil { s.Errorf("Failed initializing db connection:" + " " + err.Error()) @@ -121,6 +117,7 @@ func runMemphis(s *server.Server) db.DbInstance { } s.InitializeMemphisHandlers(dbInstance) + go s.CreateSystemLogsStream() err = server.CreateRootUserOnFirstSystemLoad() if err != nil { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 816f6dc45..31c45d14a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -953,6 +953,7 @@ func (js *jetStream) monitorCluster() { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) } js.processLeaderChange(isLeader) + js.srv.memphisClusterReady() if isLeader && !beenLeader { beenLeader = true if n.NeedSnapshot() { diff --git a/server/memphis_handlers.go b/server/memphis_handlers.go index d07292fdc..b4e8ddab0 100644 --- a/server/memphis_handlers.go +++ b/server/memphis_handlers.go @@ -67,6 +67,9 @@ type srvMemphis struct { dbCtx context.Context dbCancel context.CancelFunc activateSysLogsPubFunc func() + mcrReported bool + mcr chan struct{} // memphis cluster ready + logStreamCreated bool } func (s *Server) InitializeMemphisHandlers(dbInstance db.DbInstance) { @@ -76,6 +79,9 @@ func (s *Server) InitializeMemphisHandlers(dbInstance db.DbInstance) { s.memphis.dbCancel = dbInstance.Cancel s.memphis.nuid = nuid.New() s.memphis.serverID = configuration.SERVER_NAME + s.memphis.mcrReported = false + s.memphis.mcr = make(chan struct{}) + s.memphis.logStreamCreated = false usersCollection = db.GetCollection("users", dbInstance.Client) imagesCollection = db.GetCollection("images", dbInstance.Client) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 7af6b273f..e041edf9d 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -173,10 +173,36 @@ const ( syslogsErrSubject = "err" ) -func (s *Server) CreateSystemLogsStream() error { +func (s *Server) memphisClusterReady() { + if !s.memphis.mcrReported { + s.memphis.mcrReported = true + close(s.memphis.mcr) + } +} + +func (s *Server) CreateSystemLogsStream() { + if s.JetStreamIsClustered() { + if !s.memphis.logStreamCreated { + timeout := time.NewTimer(2 * time.Minute) + select { + case <-timeout.C: + s.Fatalf("Failed to create syslogs stream: cluster readiness timeout") + case <-s.memphis.mcr: + timeout.Stop() + s.memphis.logStreamCreated = true + } + + if !s.JetStreamIsLeader() { + return + } + } + + } + retentionDays, err := strconv.Atoi(configuration.LOGS_RETENTION_IN_DAYS) if err != nil { - return err + s.Fatalf("Failed to create syslogs stream: " + " " + err.Error()) + } retentionDur := time.Duration(retentionDays) * time.Hour * 24 @@ -190,14 +216,14 @@ func (s *Server) CreateSystemLogsStream() error { Storage: FileStorage, }) - if err == nil { - if s.memphis.activateSysLogsPubFunc == nil { - return errors.New("publish activation func is not initialised") - } - s.memphis.activateSysLogsPubFunc() + if err != nil { + s.Fatalf("Failed to create syslogs stream: " + " " + err.Error()) } - return err + if s.memphis.activateSysLogsPubFunc == nil { + s.Fatalf("publish activation func is not initialised") + } + s.memphis.activateSysLogsPubFunc() } func (s *Server) memphisAddStream(sc *StreamConfig) error {