Skip to content

Commit

Permalink
feat: migrate other services to use cases (2nd iteration) (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeii committed Mar 27, 2024
1 parent f6f24b9 commit 7919d83
Show file tree
Hide file tree
Showing 32 changed files with 1,419 additions and 1,016 deletions.
2 changes: 0 additions & 2 deletions cmd/swat4master/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/sergeii/swat4master/cmd/swat4master/container"
"github.com/sergeii/swat4master/cmd/swat4master/logging"
"github.com/sergeii/swat4master/cmd/swat4master/persistence"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/services/monitoring"
"github.com/sergeii/swat4master/internal/services/probe"
"github.com/sergeii/swat4master/internal/validation"
Expand All @@ -20,7 +19,6 @@ var Module = fx.Module("application",
fx.Provide(validation.New),
fx.Provide(persistence.Provide),
fx.Provide(monitoring.NewMetricService),
fx.Provide(finding.NewService),
fx.Provide(probe.NewService),
container.Module,
)
39 changes: 27 additions & 12 deletions cmd/swat4master/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/internal/core/usecases/addserver"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/core/usecases/getserver"
"github.com/sergeii/swat4master/internal/core/usecases/listservers"
"github.com/sergeii/swat4master/internal/core/usecases/refreshservers"
"github.com/sergeii/swat4master/internal/core/usecases/removeserver"
"github.com/sergeii/swat4master/internal/core/usecases/renewserver"
"github.com/sergeii/swat4master/internal/core/usecases/reportserver"
"github.com/sergeii/swat4master/internal/core/usecases/reviveservers"
)

type Container struct {
GetServer getserver.UseCase
AddServer addserver.UseCase
ListServers listservers.UseCase
ReportServer reportserver.UseCase
RenewServer renewserver.UseCase
RemoveServer removeserver.UseCase
GetServer getserver.UseCase
AddServer addserver.UseCase
ListServers listservers.UseCase
ReportServer reportserver.UseCase
RenewServer renewserver.UseCase
RemoveServer removeserver.UseCase
CleanServers cleanservers.UseCase
RefreshServers refreshservers.UseCase
ReviveServers reviveservers.UseCase
}

func New(
Expand All @@ -27,14 +33,20 @@ func New(
reportServerUseCase reportserver.UseCase,
renewServerUseCase renewserver.UseCase,
removeServerUseCase removeserver.UseCase,
cleanServersUseCase cleanservers.UseCase,
refreshServersUseCase refreshservers.UseCase,
reviveServersUseCase reviveservers.UseCase,
) Container {
return Container{
GetServer: getServerUseCase,
AddServer: addServerUseCase,
ListServers: listServersUseCase,
ReportServer: reportServerUseCase,
RenewServer: renewServerUseCase,
RemoveServer: removeServerUseCase,
GetServer: getServerUseCase,
AddServer: addServerUseCase,
ListServers: listServersUseCase,
ReportServer: reportServerUseCase,
RenewServer: renewServerUseCase,
RemoveServer: removeServerUseCase,
CleanServers: cleanServersUseCase,
RefreshServers: refreshServersUseCase,
ReviveServers: reviveServersUseCase,
}
}

Expand All @@ -45,5 +57,8 @@ var Module = fx.Module("container",
fx.Provide(reportserver.New),
fx.Provide(renewserver.New),
fx.Provide(removeserver.New),
fx.Provide(cleanservers.New),
fx.Provide(refreshservers.New),
fx.Provide(reviveservers.New),
fx.Provide(New),
)
40 changes: 27 additions & 13 deletions cmd/swat4master/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cleaner

import (
"context"
"time"

"github.com/jonboulle/clockwork"
"github.com/rs/zerolog"
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/cleaning"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Cleaner struct{}
Expand All @@ -18,7 +20,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *cleaning.Service,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
cfg config.Config,
) {
ticker := clock.NewTicker(cfg.CleanInterval)
Expand All @@ -39,11 +42,7 @@ func Run(
close(stopped)
return
case <-tickerCh:
if err := service.Clean(ctx, clock.Now().Add(-cfg.CleanRetention)); err != nil {
logger.Error().
Err(err).
Msg("Failed to clean outdated servers")
}
clean(ctx, clock, logger, metrics, uc, cfg.CleanRetention)
}
}
}
Expand All @@ -52,15 +51,16 @@ func NewCleaner(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *cleaning.Service,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
logger *zerolog.Logger,
) *Cleaner {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -73,10 +73,24 @@ func NewCleaner(
return &Cleaner{}
}

func clean(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
retention time.Duration,
) {
resp, err := uc.Execute(ctx, clock.Now().Add(-retention))
if err != nil {
logger.Error().
Err(err).
Msg("Failed to clean outdated servers")
}
metrics.CleanerRemovals.Add(float64(resp.Count))
metrics.CleanerErrors.Add(float64(resp.Errors))
}

var Module = fx.Module("cleaner",
fx.Provide(
fx.Private,
cleaning.NewService,
),
fx.Provide(NewCleaner),
)
25 changes: 16 additions & 9 deletions cmd/swat4master/modules/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/core/usecases/refreshservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Refresher struct{}
Expand All @@ -18,7 +19,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
cfg config.Config,
) {
refresher := clock.NewTicker(cfg.DiscoveryRefreshInterval)
Expand All @@ -36,7 +38,7 @@ func Run(
close(stopped)
return
case <-refresher.Chan():
refresh(ctx, clock, logger, service, cfg)
refresh(ctx, clock, logger, metrics, uc, cfg)
}
}
}
Expand All @@ -45,15 +47,16 @@ func NewRefresher(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
logger *zerolog.Logger,
) *Refresher {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -70,18 +73,22 @@ func refresh(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
cfg config.Config,
) {
// make sure the probes don't run beyond the next cycle of discovery
deadline := clock.Now().Add(cfg.DiscoveryRefreshInterval)
cnt, err := service.RefreshDetails(ctx, deadline)

result, err := uc.Execute(ctx, deadline)
if err != nil {
logger.Warn().Err(err).Msg("Unable to refresh details for servers")
return
}
if cnt > 0 {
logger.Info().Int("count", cnt).Msg("Added servers to details discovery queue")

if result.Count > 0 {
metrics.DiscoveryQueueProduced.Add(float64(result.Count))
logger.Info().Int("count", result.Count).Msg("Added servers to details discovery queue")
} else {
logger.Debug().Msg("Added no servers to details discovery queue")
}
Expand Down
30 changes: 18 additions & 12 deletions cmd/swat4master/modules/reviver/reviver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/core/usecases/reviveservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Reviver struct{}
Expand All @@ -18,7 +19,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
cfg config.Config,
) {
reviver := clock.NewTicker(cfg.DiscoveryRevivalInterval)
Expand All @@ -41,7 +43,7 @@ func Run(
close(stopped)
return
case <-reviverCh:
revive(ctx, clock, logger, service, cfg)
revive(ctx, clock, logger, metrics, uc, cfg)
}
}
}
Expand All @@ -50,15 +52,16 @@ func NewReviver(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
logger *zerolog.Logger,
) *Reviver {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -75,30 +78,33 @@ func revive(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
cfg config.Config,
) {
now := clock.Now()

// make sure the probes don't run beyond the next cycle of discovery
deadline := now.Add(cfg.DiscoveryRevivalInterval)

cnt, err := service.ReviveServers(
ctx,
ucRequest := reviveservers.NewRequest(
now.Add(-cfg.DiscoveryRevivalScope), // min scope
now.Add(-cfg.DiscoveryRevivalInterval), // max scope
now, // min countdown
now.Add(cfg.DiscoveryRevivalCountdown), // max countdown
deadline,
)
result, err := uc.Execute(ctx, ucRequest)
if err != nil {
logger.Warn().Err(err).Msg("Unable to refresh revive outdated servers")
logger.Warn().Err(err).Msg("Unable to revive outdated servers")
return
}
if cnt > 0 {
logger.Info().Int("count", cnt).Msg("Added servers to port discovery queue")

if result.Count > 0 {
metrics.DiscoveryQueueProduced.Add(float64(result.Count))
logger.Info().Int("count", result.Count).Msg("Added servers to revival discovery queue")
} else {
logger.Debug().Msg("Added no servers to port discovery queue")
logger.Debug().Msg("Added no servers to revival discovery queue")
}
}

Expand Down
Loading

0 comments on commit 7919d83

Please sign in to comment.