Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate other services to use cases (2nd iteration) #18

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cmd/swat4master/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/sergeii/swat4master/cmd/swat4master/container"
"github.com/sergeii/swat4master/cmd/swat4master/logging"
"github.com/sergeii/swat4master/cmd/swat4master/persistence"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/services/monitoring"
"github.com/sergeii/swat4master/internal/services/probe"
"github.com/sergeii/swat4master/internal/validation"
Expand All @@ -20,7 +19,6 @@ var Module = fx.Module("application",
fx.Provide(validation.New),
fx.Provide(persistence.Provide),
fx.Provide(monitoring.NewMetricService),
fx.Provide(finding.NewService),
fx.Provide(probe.NewService),
container.Module,
)
39 changes: 27 additions & 12 deletions cmd/swat4master/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/internal/core/usecases/addserver"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/core/usecases/getserver"
"github.com/sergeii/swat4master/internal/core/usecases/listservers"
"github.com/sergeii/swat4master/internal/core/usecases/refreshservers"
"github.com/sergeii/swat4master/internal/core/usecases/removeserver"
"github.com/sergeii/swat4master/internal/core/usecases/renewserver"
"github.com/sergeii/swat4master/internal/core/usecases/reportserver"
"github.com/sergeii/swat4master/internal/core/usecases/reviveservers"
)

type Container struct {
GetServer getserver.UseCase
AddServer addserver.UseCase
ListServers listservers.UseCase
ReportServer reportserver.UseCase
RenewServer renewserver.UseCase
RemoveServer removeserver.UseCase
GetServer getserver.UseCase
AddServer addserver.UseCase
ListServers listservers.UseCase
ReportServer reportserver.UseCase
RenewServer renewserver.UseCase
RemoveServer removeserver.UseCase
CleanServers cleanservers.UseCase
RefreshServers refreshservers.UseCase
ReviveServers reviveservers.UseCase
}

func New(
Expand All @@ -27,14 +33,20 @@ func New(
reportServerUseCase reportserver.UseCase,
renewServerUseCase renewserver.UseCase,
removeServerUseCase removeserver.UseCase,
cleanServersUseCase cleanservers.UseCase,
refreshServersUseCase refreshservers.UseCase,
reviveServersUseCase reviveservers.UseCase,
) Container {
return Container{
GetServer: getServerUseCase,
AddServer: addServerUseCase,
ListServers: listServersUseCase,
ReportServer: reportServerUseCase,
RenewServer: renewServerUseCase,
RemoveServer: removeServerUseCase,
GetServer: getServerUseCase,
AddServer: addServerUseCase,
ListServers: listServersUseCase,
ReportServer: reportServerUseCase,
RenewServer: renewServerUseCase,
RemoveServer: removeServerUseCase,
CleanServers: cleanServersUseCase,
RefreshServers: refreshServersUseCase,
ReviveServers: reviveServersUseCase,
}
}

Expand All @@ -45,5 +57,8 @@ var Module = fx.Module("container",
fx.Provide(reportserver.New),
fx.Provide(renewserver.New),
fx.Provide(removeserver.New),
fx.Provide(cleanservers.New),
fx.Provide(refreshservers.New),
fx.Provide(reviveservers.New),
fx.Provide(New),
)
40 changes: 27 additions & 13 deletions cmd/swat4master/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cleaner

import (
"context"
"time"

"github.com/jonboulle/clockwork"
"github.com/rs/zerolog"
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/cleaning"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Cleaner struct{}
Expand All @@ -18,7 +20,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *cleaning.Service,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
cfg config.Config,
) {
ticker := clock.NewTicker(cfg.CleanInterval)
Expand All @@ -39,11 +42,7 @@ func Run(
close(stopped)
return
case <-tickerCh:
if err := service.Clean(ctx, clock.Now().Add(-cfg.CleanRetention)); err != nil {
logger.Error().
Err(err).
Msg("Failed to clean outdated servers")
}
clean(ctx, clock, logger, metrics, uc, cfg.CleanRetention)
}
}
}
Expand All @@ -52,15 +51,16 @@ func NewCleaner(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *cleaning.Service,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
logger *zerolog.Logger,
) *Cleaner {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -73,10 +73,24 @@ func NewCleaner(
return &Cleaner{}
}

func clean(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
metrics *monitoring.MetricService,
uc cleanservers.UseCase,
retention time.Duration,
) {
resp, err := uc.Execute(ctx, clock.Now().Add(-retention))
if err != nil {
logger.Error().
Err(err).
Msg("Failed to clean outdated servers")
}
metrics.CleanerRemovals.Add(float64(resp.Count))
metrics.CleanerErrors.Add(float64(resp.Errors))
}

var Module = fx.Module("cleaner",
fx.Provide(
fx.Private,
cleaning.NewService,
),
fx.Provide(NewCleaner),
)
25 changes: 16 additions & 9 deletions cmd/swat4master/modules/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/core/usecases/refreshservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Refresher struct{}
Expand All @@ -18,7 +19,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
cfg config.Config,
) {
refresher := clock.NewTicker(cfg.DiscoveryRefreshInterval)
Expand All @@ -36,7 +38,7 @@ func Run(
close(stopped)
return
case <-refresher.Chan():
refresh(ctx, clock, logger, service, cfg)
refresh(ctx, clock, logger, metrics, uc, cfg)
}
}
}
Expand All @@ -45,15 +47,16 @@ func NewRefresher(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
logger *zerolog.Logger,
) *Refresher {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -70,18 +73,22 @@ func refresh(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc refreshservers.UseCase,
cfg config.Config,
) {
// make sure the probes don't run beyond the next cycle of discovery
deadline := clock.Now().Add(cfg.DiscoveryRefreshInterval)
cnt, err := service.RefreshDetails(ctx, deadline)

result, err := uc.Execute(ctx, deadline)
if err != nil {
logger.Warn().Err(err).Msg("Unable to refresh details for servers")
return
}
if cnt > 0 {
logger.Info().Int("count", cnt).Msg("Added servers to details discovery queue")

if result.Count > 0 {
metrics.DiscoveryQueueProduced.Add(float64(result.Count))
logger.Info().Int("count", result.Count).Msg("Added servers to details discovery queue")
} else {
logger.Debug().Msg("Added no servers to details discovery queue")
}
Expand Down
30 changes: 18 additions & 12 deletions cmd/swat4master/modules/reviver/reviver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/services/discovery/finding"
"github.com/sergeii/swat4master/internal/core/usecases/reviveservers"
"github.com/sergeii/swat4master/internal/services/monitoring"
)

type Reviver struct{}
Expand All @@ -18,7 +19,8 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
cfg config.Config,
) {
reviver := clock.NewTicker(cfg.DiscoveryRevivalInterval)
Expand All @@ -41,7 +43,7 @@ func Run(
close(stopped)
return
case <-reviverCh:
revive(ctx, clock, logger, service, cfg)
revive(ctx, clock, logger, metrics, uc, cfg)
}
}
}
Expand All @@ -50,15 +52,16 @@ func NewReviver(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
logger *zerolog.Logger,
) *Reviver {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, service, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -75,30 +78,33 @@ func revive(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
service *finding.Service,
metrics *monitoring.MetricService,
uc reviveservers.UseCase,
cfg config.Config,
) {
now := clock.Now()

// make sure the probes don't run beyond the next cycle of discovery
deadline := now.Add(cfg.DiscoveryRevivalInterval)

cnt, err := service.ReviveServers(
ctx,
ucRequest := reviveservers.NewRequest(
now.Add(-cfg.DiscoveryRevivalScope), // min scope
now.Add(-cfg.DiscoveryRevivalInterval), // max scope
now, // min countdown
now.Add(cfg.DiscoveryRevivalCountdown), // max countdown
deadline,
)
result, err := uc.Execute(ctx, ucRequest)
if err != nil {
logger.Warn().Err(err).Msg("Unable to refresh revive outdated servers")
logger.Warn().Err(err).Msg("Unable to revive outdated servers")
return
}
if cnt > 0 {
logger.Info().Int("count", cnt).Msg("Added servers to port discovery queue")

if result.Count > 0 {
metrics.DiscoveryQueueProduced.Add(float64(result.Count))
logger.Info().Int("count", result.Count).Msg("Added servers to revival discovery queue")
} else {
logger.Debug().Msg("Added no servers to port discovery queue")
logger.Debug().Msg("Added no servers to revival discovery queue")
}
}

Expand Down
Loading
Loading