Skip to content

Commit

Permalink
iter23: Graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr Kosygin committed Jul 15, 2024
1 parent 127fe0c commit 7bc663c
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 28 deletions.
8 changes: 6 additions & 2 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"syscall"

"github.com/kosalnik/metrics/internal/application/client"
"github.com/kosalnik/metrics/internal/config"
"github.com/kosalnik/metrics/internal/graceful"
"github.com/kosalnik/metrics/internal/log"
"github.com/kosalnik/metrics/internal/version"
)
Expand Down Expand Up @@ -41,6 +43,8 @@ func main() {
}
ctx := context.Background()
app := client.NewClient(ctx, cfg.Agent)
app.Run(ctx)

graceful.
NewManager(app).
Notify(syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM).
Run(ctx)
}
11 changes: 7 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ package main
import (
"context"
"os"
"syscall"

"github.com/kosalnik/metrics/internal/application/server"
"github.com/kosalnik/metrics/internal/config"
"github.com/kosalnik/metrics/internal/graceful"
"github.com/kosalnik/metrics/internal/log"
"github.com/kosalnik/metrics/internal/version"
)
Expand All @@ -23,6 +25,7 @@ func main() {
BuildDate: buildDate,
BuildCommit: buildCommit,
}.Print(os.Stdout)
ctx := context.Background()
cfg := config.NewConfig()
if err := config.ParseServerFlags(os.Args, &cfg.Server); err != nil {
panic(err.Error())
Expand All @@ -31,8 +34,8 @@ func main() {
if err := log.InitLogger(cfg.Server.Logger.Level); err != nil {
panic(err.Error())
}
err := app.Run(context.Background())
if err != nil {
log.Panic().Err(err).Msg("panic")
}
graceful.
NewManager(app).
Notify(syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM).
Run(ctx)
}
43 changes: 35 additions & 8 deletions internal/application/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kosalnik/metrics/internal/log"
"github.com/kosalnik/metrics/internal/metric"
"github.com/kosalnik/metrics/internal/models"
"golang.org/x/sync/errgroup"
)

type Client struct {
Expand All @@ -35,23 +36,49 @@ func NewClient(ctx context.Context, config config.Agent) *Client {
}
}

func (c *Client) Run(ctx context.Context) {
func (c *Client) Run(ctx context.Context) error {
g := errgroup.Group{}
g.Go(func() error {
return c.RunMetricsSender(ctx)
})
g.Go(func() error {
return c.RunMetricsPoller(ctx)
})
return g.Wait()
}

// RunMetricsSender Запустить процесс периодической отправки метрик в коллектор
func (c *Client) RunMetricsSender(ctx context.Context) error {
log.Info().
Int64("Poll interval", c.config.PollInterval).
Int64("Report interval", c.config.ReportInterval).
Str("Collector address", c.config.CollectorAddress).
Msg("Running agent")
go c.poll(ctx)
c.push(ctx)
return c.push(ctx)
}

// RunMetricsPoller Запустить процесс периодического сбора метрик
func (c *Client) RunMetricsPoller(ctx context.Context) error {
log.Info().
Int64("Poll interval", c.config.PollInterval).
Msg("Running metrics poll process")
return c.poll(ctx)
}

func (c *Client) Shutdown(ctx context.Context) {
log.Info().Msg("Shutdown service Client")
if err := c.sender.SendBatch(ctx, c.collectMetrics()); err != nil {
log.Error().Err(err).Msg("fail push")
}
log.Info().Msg("Shutdown service Client completed")
}

func (c *Client) push(ctx context.Context) {
func (c *Client) push(ctx context.Context) error {
tick := time.NewTicker(time.Duration(c.config.ReportInterval) * time.Second)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-tick.C:
log.Info().Msg("Push")
if err := c.sender.SendBatch(ctx, c.collectMetrics()); err != nil {
Expand Down Expand Up @@ -85,13 +112,13 @@ func (c *Client) collectMetrics() []models.Metrics {
return list
}

func (c *Client) poll(ctx context.Context) {
func (c *Client) poll(ctx context.Context) error {
tick := time.NewTicker(time.Duration(c.config.PollInterval) * time.Second)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-tick.C:
if err := c.pollMetrics(ctx); err != nil {
log.Error().Err(err).Msg("poll error")
Expand Down
12 changes: 8 additions & 4 deletions internal/application/client/sender_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ type SenderRest struct {
}

func NewSenderRest(config *config.Agent) Sender {
c := http.Client{
Transport: crypt.NewCipherInterceptor(
var transport http.RoundTripper = crypt.VerifyHashInterceptor(config.Hash, http.DefaultTransport)
if config.PublicKey != nil {
transport = crypt.NewCipherInterceptor(
crypt.NewEncoder(config.PublicKey, rand.Reader),
crypt.VerifyHashInterceptor(config.Hash, http.DefaultTransport),
),
transport,
)
}
c := http.Client{
Transport: transport,
}
return &SenderRest{
client: &c,
Expand Down
60 changes: 52 additions & 8 deletions internal/application/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"golang.org/x/sync/errgroup"

"github.com/kosalnik/metrics/internal/backup"
"github.com/kosalnik/metrics/internal/config"
Expand All @@ -23,8 +24,10 @@ import (
)

type App struct {
Storage storage.Storage
config config.Server
Storage storage.Storage
config config.Server
server *http.Server
backupManager *backup.BackupManager
}

func NewApp(cfg config.Server) *App {
Expand All @@ -46,9 +49,45 @@ func (app *App) Run(ctx context.Context) error {
}
}()

app.server = &http.Server{
Addr: app.config.Address,
Handler: app.GetRouter(),
}

log.Info().Str("address", app.config.Address).Msg("Listen")
return app.server.ListenAndServe()
}

return http.ListenAndServe(app.config.Address, app.GetRouter())
func (app *App) Shutdown(ctx context.Context) {
log.Info().Msg(`Shutdown start`)
g := errgroup.Group{}
g.Go(func() (err error) {
log.Info().Msg(`Shutdown "server.App" start`)
defer func() {
if err != nil {
log.Error().Err(err).Msg(`Shutdown "server.App" error`)
} else {
log.Info().Msg(`Shutdown "server.App" completed`)
}
}()
return app.server.Shutdown(ctx)
})
g.Go(func() (err error) {
log.Info().Msg(`Shutdown "backupManager" start`)
defer func() {
if err != nil {
log.Error().Err(err).Msg(`Shutdown "backupManager" error`)
} else {
log.Info().Msg(`Shutdown "backupManager" completed`)
}
}()
err = app.backupManager.Store(ctx)
return
})
if err := g.Wait(); err != nil {
log.Error().Err(err).Msg("Shutdown error")
}
log.Info().Msg(`Shutdown completed`)
}

func (app *App) initStorage(ctx context.Context) error {
Expand Down Expand Up @@ -81,33 +120,38 @@ func (app *App) initDB(ctx context.Context) error {
// Будет скидывать бекап на диск через равные промежутки времени.
func (app *App) initBackup(ctx context.Context) error {
var err error
bm, err := backup.NewBackupManager(app.Storage, app.config.Backup)
app.backupManager, err = backup.NewBackupManager(app.Storage, app.config.Backup)
if err != nil {
return err
}
if err = bm.Recover(ctx); err != nil {
if err = app.backupManager.Recover(ctx); err != nil {
if !os.IsNotExist(err) {
return err
}
}

log.Info().Msg("schedule backup")
go bm.BackupLoop(ctx)
go app.backupManager.BackupLoop(ctx)

return nil
}

func (app *App) GetRouter() chi.Router {
r := chi.NewRouter()

if app.config.PrivateKey != nil {
r.Use(crypt.CipherMiddleware(crypt.NewDecoder(app.config.PrivateKey, rand.Reader)))
}

r.Use(
crypt.CipherMiddleware(crypt.NewDecoder(app.config.PrivateKey, rand.Reader)),
middleware.Compress(1, "application/json", "text/html"),
//gzipMiddleware,
middleware.Logger,
middleware.Recoverer,
crypt.HashCheckMiddleware(app.config.Hash),
)

requireJSONMw := middleware.AllowContentType("application/json")

r.Route("/", func(r chi.Router) {
r.With(requireJSONMw).Get("/", handlers.NewGetAllHandler(app.Storage))
r.With(requireJSONMw).Post("/updates/", handlers.NewUpdateBatchHandler(app.Storage))
Expand Down
14 changes: 13 additions & 1 deletion internal/backup/backupmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,25 @@ func (m *BackupManager) BackupLoop(ctx context.Context) {
continue
}
log.Info().Msg("backup loop: store")
if err := m.dump.Store(ctx); err != nil {
if err := m.Store(ctx); err != nil {
log.Error().Err(err).Msg("Fail backup")
}
}
}
}

func (m *BackupManager) Store(ctx context.Context) (err error) {
log.Info().Msg("Backup start")
defer func() {
if err != nil {
log.Error().Err(err).Msg("Backup error")
} else {
log.Info().Msg("Backup completed")
}
}()
return m.dump.Store(ctx)
}

// Recover - восстановить данные из бекапа.
func (m *BackupManager) Recover(ctx context.Context) error {
if m == nil || m.recover == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/config/agent_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func ParseAgentFlags(args []string, c *Agent) error {
publicKeyFile = &v
}

if publicKeyFile != nil {
if publicKeyFile != nil && *publicKeyFile != "" {
publicKeyPEM, err := os.ReadFile(*publicKeyFile)
if err != nil {
return fmt.Errorf("fail to read key: %w", err)
Expand Down
Loading

0 comments on commit 7bc663c

Please sign in to comment.