From 42d73179ef4312a8560d5e15be03a6ab617bc3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Fri, 16 Feb 2024 11:25:15 +0100 Subject: [PATCH] private/mud: service lifecycle utilities Change-Id: I3a05a4f84f414a5fed400f6e2ec5166551bb0fb4 --- private/mud/README.md | 123 +++++++++++++ private/mud/component.go | 142 +++++++++++++++ private/mud/dependency.go | 61 +++++++ private/mud/lifecycle.go | 63 +++++++ private/mud/mud.go | 368 ++++++++++++++++++++++++++++++++++++++ private/mud/mud_test.go | 186 +++++++++++++++++++ private/mud/selector.go | 53 ++++++ private/mud/sort.go | 78 ++++++++ 8 files changed, 1074 insertions(+) create mode 100644 private/mud/README.md create mode 100644 private/mud/component.go create mode 100644 private/mud/dependency.go create mode 100644 private/mud/lifecycle.go create mode 100644 private/mud/mud.go create mode 100644 private/mud/mud_test.go create mode 100644 private/mud/selector.go create mode 100644 private/mud/sort.go diff --git a/private/mud/README.md b/private/mud/README.md new file mode 100644 index 000000000000..7fcbc092dfa4 --- /dev/null +++ b/private/mud/README.md @@ -0,0 +1,123 @@ +# mud + +mud is a package for lazily starting up a large set of services and chores. +The `mud` package name comes from a common architectural pattern called "Big Ball of Mud". + +You can think about mud as a very huge map of service instances: `map[type]component`. +Where component includes the singleton instance and functions to initialize the singleton and run/close them. + +Components can also depend on each other, and there are helper function to filter the required components (and/or +initialize/start them). + +Compared to other similar libraries, like https://github.com/uber-go/fx or https://github.com/uber-go/dig, mud is just a +very flexible framework, it wouldn't like to restrict the usage. Therefore advanced workflows also can be implemented +with filtering different graphs and using them. + +Users of this library has more power (and more responsibility). + +## Getting started + +You can create the instance registry with: + +``` +mud := mud.NewBall() +``` + +Register a new component: + +``` +Provide[your.Service](ball, func() your.Service { + return your.NewService() +}) +``` + +Now, your component is registered, but not yet initialized. You should select some of the services to Init / run them: + +``` +err := mud.ForEach(ball, mud.Initialize(context.TODO())) +if err != nil { + panic(err) +} +``` + +Now your component is initialized: + +``` +fmt.Println(mud.Find(ball, mud.All)[0].Instance()) +``` + +This one selected the first component (we registered only one), but you can also use different selectors. This one +selects the components by type. + +``` +fmt.Println(mud.Find(ball, mud.Select[your.Service](ball))[0].Instance()) +``` + +Or, of you are sure, it's there: + +``` +fmt.Println(mud.MustLookup[your.Service](ball)) +``` + +## Dependencies and dependency injection + +Dependencies are automatically injected. Let's say you have two structs: + +``` +type Service struct { +} + +func NewService() Service { + return Service{} +} + +type Endpoint struct { + Service Service +} + +func NewEndpoint(service Service) Endpoint { + return Endpoint{ + Service: service, + } +} +``` + +Now you can register both: + +``` +mud.Provide[your.Service](ball, your.NewService) +mud.Provide[your.Endpoint](ball, your.NewEndpoint) +``` + +When you initialize the Endpoint, Service will be injected (if the instance is available!!!): + +``` +err := mud.MustDo[your.Service](ball, mud.Initialize(context.TODO())) +if err != nil { + panic(err) +} + +err = mud.MustDo[your.Endpoint](ball, mud.Initialize(context.TODO())) +if err != nil { + panic(err) +} +``` + +But instead of initializing manually, you can also just ask what you need, and initialize everything in the right order + +``` +err := mud.ForEachDependency(ball, mud.Select[your.Endpoint](ball), mud.Initialize(context.TODO()), mud.All) +``` + +## Views + +Views are useful when you already have sg. registered, but you would like to make it fully or partially available under +different type: + +``` +mud.Provide[satellite.DB](ball, OpenSatelliteDB) +mud.View[satellite.DB, gracefulexit.DB](ball, satellite.DB.GracefulExit) +``` + +This registers a `satellite.DB` (first line) and a `gracefulexit.DB` (second line). And if `gracefulexit.DB` is needed +for injection, it will call the function to get it. \ No newline at end of file diff --git a/private/mud/component.go b/private/mud/component.go new file mode 100644 index 000000000000..16d2b406802b --- /dev/null +++ b/private/mud/component.go @@ -0,0 +1,142 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "context" + "reflect" + "strings" + "time" + + "golang.org/x/sync/errgroup" +) + +// StageName is the unique identifier of the stages (~lifecycle events). +type StageName string + +// Component manages the lifecycle of a singleton Golang struct. +type Component struct { + target reflect.Type + + instance any + + // Requirements are other components which is used by this component. + // All requirements will be initialized/started before creating/running the component. + requirements []reflect.Type + + create *Stage + + run *Stage + + close *Stage + + tags []any +} + +// Name returns with the human friendly name of the component. +func (c *Component) Name() string { + return c.target.String() +} + +// ID is the unque identifier of the component. +func (c *Component) ID() string { + return fullyQualifiedTypeName(c.target) +} + +// Init initializes the internal singleton instance. +func (c *Component) Init(ctx context.Context) error { + if c.instance != nil { + return nil + } + c.create.started = time.Now() + err := c.create.run(nil, ctx) + c.create.finished = time.Now() + return err +} + +// Run executes the Run stage function. +func (c *Component) Run(ctx context.Context, eg *errgroup.Group) error { + if c.run == nil || !c.run.started.IsZero() { + return nil + } + if c.instance == nil { + return nil + } + + if c.run.background { + eg.Go(func() error { + c.run.started = time.Now() + err := c.run.run(c.instance, ctx) + c.run.finished = time.Now() + return err + }) + return nil + } else { + c.run.started = time.Now() + err := c.run.run(c.instance, ctx) + c.run.finished = time.Now() + return err + } +} + +// Close calls the Close stage function. +func (c *Component) Close(ctx context.Context) error { + if c.close == nil || c.close.run == nil || !c.close.started.IsZero() || c.instance == nil { + return nil + } + c.close.started = time.Now() + err := c.close.run(c.instance, ctx) + c.close.finished = time.Now() + return err +} + +// String returns with a string representation of the component. +func (c *Component) String() string { + out := c.target.String() + out += stageStr(c.create, "i") + out += stageStr(c.run, "r") + out += stageStr(c.close, "c") + return out +} + +func stageStr(stage *Stage, s string) string { + if stage == nil { + return "_" + } + if stage.started.IsZero() { + return strings.ToLower(s) + } + return strings.ToUpper(s) +} + +func (c *Component) addRequirement(in reflect.Type) { + for _, req := range c.requirements { + if req == in { + return + } + } + c.requirements = append(c.requirements, in) +} + +// Instance returns the singleton instance of the component. Can be null, if not yet initialized. +func (c *Component) Instance() any { + return c.instance +} + +// GetTarget returns with type, which is used as n identifier in mud. +func (c *Component) GetTarget() reflect.Type { + return c.target +} + +// Stage represents a function which should be called on the component at the right time (like start, stop, init). +type Stage struct { + run func(any, context.Context) error + + // should be executed in the background or not. + background bool + + started time.Time + + finished time.Time +} diff --git a/private/mud/dependency.go b/private/mud/dependency.go new file mode 100644 index 000000000000..1491980272da --- /dev/null +++ b/private/mud/dependency.go @@ -0,0 +1,61 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "fmt" + "reflect" +) + +// Optional tag is used to mark components which may not required. +type Optional struct{} + +// Find selects components matching the selector. +func Find(ball *Ball, selector ComponentSelector) (result []*Component) { + for _, c := range ball.registry { + if selector(c) { + result = append(result, c) + } + } + return result +} + +// FindSelectedWithDependencies selects components matching the selector, together with all the dependencies. +func FindSelectedWithDependencies(ball *Ball, selector ComponentSelector) (result []*Component) { + dependencies := map[reflect.Type]struct{}{} + for _, component := range ball.registry { + if selector(component) { + collectDependencies(ball, component, dependencies) + } + } + for k := range dependencies { + result = append(result, mustLookupByType(ball, k)) + } + return filterComponents(sortedComponents(ball), result) +} + +func collectDependencies(ball *Ball, c *Component, result map[reflect.Type]struct{}) { + // don't check it again + for k := range result { + if c.target == k { + return + } + } + + // don't check it again + result[c.target] = struct{}{} + + for _, dep := range c.requirements { + // ignore if optional + dc, found := lookupByType(ball, dep) + if !found { + panic(fmt.Sprintf("Dependency %s for %s is missing", dep, c.ID())) + } + _, optional := findTag[Optional](dc) + if optional { + continue + } + collectDependencies(ball, mustLookupByType(ball, dep), result) + } +} diff --git a/private/mud/lifecycle.go b/private/mud/lifecycle.go new file mode 100644 index 000000000000..479056096041 --- /dev/null +++ b/private/mud/lifecycle.go @@ -0,0 +1,63 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "context" + "time" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// RunWithDependencies will init and run all components which are matched by the selector. +func RunWithDependencies(ctx context.Context, ball *Ball, selector ComponentSelector) error { + log := ball.getLogger() + return runComponents(ctx, log, FindSelectedWithDependencies(ball, selector)) +} + +// Run runs the required component and all dependencies in the right order. +func runComponents(ctx context.Context, log *zap.Logger, components []*Component) error { + err := forEachComponent(components, func(component *Component) error { + log.Info("init", zap.String("component", component.Name())) + return component.Init(ctx) + }) + if err != nil { + return err + } + g, ctx := errgroup.WithContext(ctx) + err = forEachComponent(components, func(component *Component) error { + log.Info("init", zap.String("starting", component.Name())) + return component.Run(ctx, g) + }) + if err != nil { + return err + } + return g.Wait() +} + +// CloseAll calls the close callback stage on all initialized components. +func CloseAll(ball *Ball, timeout time.Duration) error { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + components := ball.registry + reverse(components) + log := ball.getLogger() + return forEachComponent(components, func(component *Component) error { + log.Info("closing", zap.String("component", component.Name())) + if component.instance != nil { + return component.Close(ctx) + } + return nil + }) +} + +// Reverse reverses the elements of the slice in place. +// TODO: use slices.Reverse when minimum golang version is updated. +func reverse[S ~[]E, E any](s S) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} diff --git a/private/mud/mud.go b/private/mud/mud.go new file mode 100644 index 000000000000..66a7d4dce4ec --- /dev/null +++ b/private/mud/mud.go @@ -0,0 +1,368 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "context" + "fmt" + "reflect" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" +) + +// Ball is the component registry. +type Ball struct { + registry []*Component +} + +// NewBall creates a new component registry. +func NewBall() *Ball { + return &Ball{} +} + +// getLogger returns with the zap Logger, if component is registered. +// used for internal logging. +func (ball *Ball) getLogger() *zap.Logger { + if logger := lookup[*zap.Logger](ball); logger != nil { + if logger.instance == nil { + _ = logger.Init(context.Background()) + } + if logger.instance != nil { + return logger.instance.(*zap.Logger) + } + } + return zap.NewNop() +} + +// RegisterManual registers a component manually. +// Most of the time you need either Provide or View instead of this. +func RegisterManual[T any]( + ball *Ball, + factory func(ctx context.Context) (T, error), +) { + if component := lookup[T](ball); component != nil { + panic("duplicate registration, " + name[T]()) + } + component := &Component{} + component.create = &Stage{ + run: func(a any, ctx context.Context) (err error) { + component.instance, err = factory(ctx) + return err + }, + } + + component.target = reflect.TypeOf((*T)(nil)).Elem() + ball.registry = append(ball.registry, component) +} + +// Tag attaches a tag to the component registration. +func Tag[A any, Tag any](ball *Ball, tag Tag) { + c := mustLookup[A](ball) + + // we don't allow duplicated registrations, as we always return with the first value. + for ix, existing := range c.tags { + _, ok := existing.(Tag) + if ok { + c.tags[ix] = tag + return + } + } + c.tags = append(c.tags, tag) +} + +// GetTag returns with attached tag (if attached). +func GetTag[A any, Tag any](ball *Ball) (Tag, bool) { + c := mustLookup[A](ball) + return findTag[Tag](c) +} + +func findTag[Tag any](c *Component) (Tag, bool) { + for _, tag := range c.tags { + c, ok := tag.(Tag) + if ok { + return c, true + } + } + var t Tag + return t, false +} + +// DependsOn creates a dependency relation between two components. +// With the help of the dependency graph, they can be executed in the right order. +func DependsOn[BASE any, DEPENDENCY any](ball *Ball) { + c := mustLookup[BASE](ball) + c.addRequirement(typeOf[DEPENDENCY]()) +} + +// ForEach executes a callback action on all the selected components. +func ForEach(ball *Ball, callback func(component *Component) error, selectors ...ComponentSelector) error { + return forEachComponent(sortedComponents(ball), callback, selectors...) +} + +// ForEachDependency executes a callback action on all the components, matching the target selector and dependencies, but only if selectors parameter is matching them. +func ForEachDependency(ball *Ball, target ComponentSelector, callback func(component *Component) error, selectors ...ComponentSelector) error { + return forEachComponent(FindSelectedWithDependencies(ball, target), callback, selectors...) +} + +// Initialize components as ForEach callback. +func Initialize(ctx context.Context) func(c *Component) error { + return func(c *Component) error { + return c.Init(ctx) + } +} + +func forEachComponent(components []*Component, callback func(component *Component) error, selectors ...ComponentSelector) error { + for _, c := range components { + if len(selectors) == 0 { + err := callback(c) + if err != nil { + return err + } + } + for _, s := range selectors { + if s(c) { + err := callback(c) + if err != nil { + return err + } + } + break + } + } + return nil +} + +// Execute executes a function with injecting all the required dependencies with type based Dependency Injection. +func Execute[A any](ctx context.Context, ball *Ball, factory interface{}) (A, error) { + var a A + response, err := runWithParams(ctx, ball, factory) + if err != nil { + return a, err + } + if len(response) > 1 { + if response[1].Interface() != nil { + return a, response[1].Interface().(error) + } + } + if response[0].Interface() == nil { + return a, errs.New("Provider factory is executed without error, but returner with nil instance. " + name[A]()) + } + + return response[0].Interface().(A), nil +} + +// Execute0 executes a function with injection all required parameters. Same as Execute but without return type. +func Execute0(ctx context.Context, ball *Ball, factory interface{}) error { + _, err := runWithParams(ctx, ball, factory) + if err != nil { + return err + } + return nil +} + +// injectAnd execute calls the `factory` method, finding all required parameters in the registry. +func runWithParams(ctx context.Context, ball *Ball, factory interface{}) ([]reflect.Value, error) { + ft := reflect.TypeOf(factory) + if reflect.Func != ft.Kind() { + panic("Provider argument must be a func()") + } + + specificError := func(t reflect.Type, ix int, reason string) error { + return errs.New("Couldn't inject %s to the %d argument of the provider function %v: %s", t, ix, reflect.ValueOf(factory).String(), reason) + } + + var args []reflect.Value + for i := 0; i < ft.NumIn(); i++ { + // ball can be injected to anywhere. But it's better to not use. + if ft.In(i) == reflect.TypeOf(ball) { + args = append(args, reflect.ValueOf(ball)) + continue + } + + // context can be injected without strong dependency + if ctx != nil && ft.In(i) == typeOf[context.Context]() { + args = append(args, reflect.ValueOf(ctx)) + continue + } + + dep, ok := lookupByType(ball, ft.In(i)) + if ok { + if dep.instance == nil { + return nil, specificError(ft.In(i), i, "instance is nil (not yet initialized)") + } + args = append(args, reflect.ValueOf(dep.instance)) + continue + } + return nil, specificError(ft.In(i), i, "instance is not registered") + + } + + return reflect.ValueOf(factory).Call(args), nil +} + +// Provide registers a new instance to the dependency pool. +// Run/Close methods are auto-detected (stage is created if they exist). +func Provide[A any](ball *Ball, factory interface{}) { + RegisterManual[A](ball, func(ctx context.Context) (A, error) { + return Execute[A](ctx, ball, factory) + }) + + t := typeOf[A]() + component, _ := lookupByType(ball, t) + + // auto-detect Run method for Run stage + runF, found := t.MethodByName("Run") + if found { + component.run = &Stage{ + background: true, + } + registerFunc[A](runF, component.run, "Run") + } + + // auto-detect Close method for Close stage + closeF, found := t.MethodByName("Close") + if found { + component.close = &Stage{} + registerFunc[A](closeF, component.close, "Close") + } + + // mark dependencies + ft := reflect.TypeOf(factory) + if ft.Kind() != reflect.Func { + panic("factory parameter of Provide must be a func") + } + for i := 0; i < ft.NumIn(); i++ { + // internal dependency without component + if ft.In(i) == reflect.TypeOf(ball) { + continue + } + + // context can be injected any time + if ft.In(i).String() == "context.Context" { + continue + } + + component.addRequirement(ft.In(i)) + } +} + +// registerFunc tries to find a func with supported signature, to be used for stage runner func. +func registerFunc[A any](f reflect.Method, run *Stage, name string) { + if !f.Func.IsValid() { + return + } + switch runner := f.Func.Interface().(type) { + case func(a A, ctx context.Context) error: + run.run = func(a any, ctx context.Context) error { + return runner(a.(A), ctx) + } + case func(a A) error: + run.run = func(a any, ctx context.Context) error { + return runner(a.(A)) + } + default: + panic(fmt.Sprintf("Unsupported %s method signature: %v", typeOf[A](), name)) + } +} + +// Supply registers and instance which is already initialized. +func Supply[T any](ball *Ball, t T) { + if lookup[T](ball) != nil { + panic(fmt.Sprintf("Component instance is already provided with Supply: %v", typeOf[T]())) + } + if typeOf[T]().Kind() == reflect.Func { + panic("function type for supply is not yet supported") + } + + ball.registry = append(ball.registry, &Component{ + target: typeOf[T](), + instance: t, + create: &Stage{ + started: time.Now(), + finished: time.Now(), + }, + }) +} + +// View is lightweight component, which provides a type based on a existing instances. +func View[A any, B any](ball *Ball, convert func(A) B) { + RegisterManual[B](ball, func(ctx context.Context) (B, error) { + a := mustLookup[A](ball) + return convert(a.instance.(A)), nil + }) + component := mustLookup[B](ball) + component.requirements = append(component.requirements, mustLookup[A](ball).target) +} + +// Dereference is a simple transformation to make real value from a pointer. Useful with View. +// for example: `View[*DB, DB](ball, Dereference[DB])`. +func Dereference[A any](a *A) A { + return *a +} + +func name[T any]() string { + var a [0]T + return reflect.TypeOf(a).Elem().String() +} + +func lookup[T any](ball *Ball) *Component { + var t [0]T + tzpe := reflect.TypeOf(t).Elem() + for _, c := range ball.registry { + if c.target == tzpe { + return c + } + } + return nil +} + +func mustLookup[T any](ball *Ball) *Component { + c := lookup[T](ball) + if c == nil { + panic("component is missing: " + name[T]()) + } + return c +} + +func lookupByType(ball *Ball, tzpe reflect.Type) (*Component, bool) { + for _, c := range ball.registry { + if c.target == tzpe { + return c, true + } + } + return nil, false +} + +func mustLookupByType(ball *Ball, tzpe reflect.Type) *Component { + c, found := lookupByType(ball, tzpe) + if !found { + panic("component is missing: " + tzpe.String()) + } + return c +} + +// MustLookup returns with the registered component instance (or panic). +func MustLookup[T any](ball *Ball) T { + component := mustLookup[T](ball) + if component.instance == nil { + panic("lookup of an uninitialized component " + name[T]()) + } + return component.instance.(T) +} + +func typeOf[A any]() reflect.Type { + var a [0]A + return reflect.TypeOf(a).Elem() +} + +func fullyQualifiedTypeName(t reflect.Type) string { + if t.Kind() == reflect.Pointer { + return "*" + fullyQualifiedTypeName(t.Elem()) + } else if t.Kind() == reflect.Slice { + return "[]" + fullyQualifiedTypeName(t.Elem()) + } + return t.PkgPath() + "." + t.Name() +} diff --git a/private/mud/mud_test.go b/private/mud/mud_test.go new file mode 100644 index 000000000000..8e19a5ddbd56 --- /dev/null +++ b/private/mud/mud_test.go @@ -0,0 +1,186 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" +) + +func ExampleBall() { + ball := NewBall() + Provide[string](ball, func() string { + return "test" + }) + Provide[string](ball, func() string { + return "test" + }) + components := Find(ball, All) + _ = components[0].Init(context.Background()) + fmt.Println(components[0].Instance()) +} + +func TestSortedDependency(t *testing.T) { + ball := NewBall() + Provide[DB](ball, NewDB) + Provide[Service1](ball, NewService1) + Provide[Service2](ball, NewService2) + + sorted := sortedComponents(ball) + require.Equal(t, "storj.io/storj/private/mud.DB", sorted[0].ID()) + require.Equal(t, "storj.io/storj/private/mud.Service1", sorted[1].ID()) + require.Equal(t, "storj.io/storj/private/mud.Service2", sorted[2].ID()) +} + +type Key struct{} + +func TestView(t *testing.T) { + ctx := testcontext.New(t) + ball := NewBall() + Supply[*DB](ball, &DB{status: "test"}) + View[*DB, DB](ball, Dereference[DB]) + // pointer registered, but value is received + Provide[Key](ball, func(db DB) Key { + return Key{} + }) + err := ForEach(ball, func(component *Component) error { + return component.Init(ctx) + }, All) + require.NoError(t, err) +} + +func TestTags(t *testing.T) { + ball := NewBall() + Supply[*DB](ball, &DB{status: "test"}) + Tag[*DB, Tag1](ball, Tag1{ + Value: "ahoj", + }) + + tag, found := GetTag[*DB, Tag1](ball) + require.True(t, found) + require.Equal(t, "ahoj", tag.Value) + + Tag[*DB, Tag1](ball, Tag1{ + Value: "second", + }) + tag, found = GetTag[*DB, Tag1](ball) + require.True(t, found) + require.Equal(t, "second", tag.Value) +} + +func TestExecute(t *testing.T) { + ctx := testcontext.New(t) + ball := NewBall() + Supply[string](ball, "Joe") + result, err := Execute[string](ctx, ball, func(ctx context.Context, name string) string { + if ctx != nil { + return "hello " + name + } + // context was not injected + return "error" + }) + require.NoError(t, err) + require.Equal(t, "hello Joe", result) +} + +type Tag1 struct { + Value string +} + +type Tag2 struct { + Value string +} + +type T1 struct{} +type T2 struct{} +type T3 struct{} +type T4 struct{} +type T5 struct{} +type I interface{} + +type DB struct { + status string +} + +func NewDB() DB { + return DB{ + status: "auto", + } +} + +func (s DB) Close(ctx context.Context) error { + fmt.Println("Closing DB") + return nil +} + +type Service interface { + Run(ctx context.Context) error + Close(ctx context.Context) error +} + +type Service1 struct { + DB DB +} + +func (s Service1) Close(ctx context.Context) error { + fmt.Println("Closing service 1") + return nil +} + +func (s Service1) Run(ctx context.Context) error { + for i := 0; i < 20; i++ { + if ctx.Err() != nil { + return ctx.Err() + } + fmt.Println("running service1", i) + time.Sleep(1 * time.Second) + } + return nil +} + +func NewService1(db DB) Service1 { + return Service1{ + DB: db, + } +} + +var _ Service = (*Service1)(nil) + +type Service2 struct { + Service1 Service1 +} + +func (s Service2) Close(ctx context.Context) error { + fmt.Println("Closing service2") + return nil +} + +func (s Service2) Run(ctx context.Context) error { + for i := 0; i < 10; i++ { + if ctx.Err() != nil { + return ctx.Err() + } + fmt.Println("running service2", i) + time.Sleep(1 * time.Second) + } + return nil +} + +func NewService2(service1 Service1) Service2 { + return Service2{ + Service1: service1, + } +} + +var _ Service = (*Service2)(nil) + +type Runnable interface { + Run(ctx context.Context) error +} diff --git a/private/mud/selector.go b/private/mud/selector.go new file mode 100644 index 000000000000..8af3d59ad560 --- /dev/null +++ b/private/mud/selector.go @@ -0,0 +1,53 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +// Select is a component selector based on the specified type ([A]). +func Select[A any](ball *Ball) ComponentSelector { + t := typeOf[A]() + mustLookup[A](ball) + return func(c *Component) bool { + return c.target == t + } +} + +// ComponentSelector can filter components. +type ComponentSelector func(c *Component) bool + +// All is a ComponentSelector which matches all components. +func All(_ *Component) bool { + return true +} + +// And selects components which matches all the selectors. +func And(selectors ...ComponentSelector) ComponentSelector { + return func(c *Component) bool { + for _, s := range selectors { + if !s(c) { + return false + } + } + return true + } +} + +// Or selects components which matches any of the selectors. +func Or(selectors ...ComponentSelector) ComponentSelector { + return func(c *Component) bool { + for _, s := range selectors { + if s(c) { + return true + } + } + return false + } +} + +// Tagged is a selector, checking an annotation key/value. +func Tagged[Tag any]() func(c *Component) bool { + return func(c *Component) bool { + _, found := findTag[Tag](c) + return found + } +} diff --git a/private/mud/sort.go b/private/mud/sort.go new file mode 100644 index 000000000000..328be02d505c --- /dev/null +++ b/private/mud/sort.go @@ -0,0 +1,78 @@ +// Copyright (C) 2024 Storj Labs, Inc. +// See LICENSE for copying information. + +package mud + +import ( + "reflect" +) + +// sortComponents returns components in order to start/run/close them. +// it implements a simple topology sorting based on Kahn's algorithm: https://en.wikipedia.org/wiki/Topological_sorting +func sortedComponents(ball *Ball) (sorted []*Component) { + // key should be initialized before the values + dependencyGraph := make(map[reflect.Type][]reflect.Type) + + for _, component := range ball.registry { + if _, found := dependencyGraph[component.target]; !found { + dependencyGraph[component.target] = make([]reflect.Type, 0) + } + + dependencyGraph[component.target] = append(dependencyGraph[component.target], component.requirements...) + } + + var next []reflect.Type + + findNext := func() { + filtered := map[reflect.Type][]reflect.Type{} + for c, deps := range dependencyGraph { + if len(deps) == 0 { + next = append(next, c) + } else { + filtered[c] = deps + } + } + dependencyGraph = filtered + } + + without := func(deps []reflect.Type, s reflect.Type) (res []reflect.Type) { + for _, d := range deps { + if d != s { + res = append(res, d) + } + } + return res + } + + findNext() + + for len(next) > 0 { + s := next[0] + next = next[1:] + + component, found := lookupByType(ball, s) + if !found { + panic("component is not registered " + s.String()) + } + sorted = append(sorted, component) + + for c, deps := range dependencyGraph { + dependencyGraph[c] = without(deps, s) + } + + findNext() + } + return sorted +} + +func filterComponents(sorted []*Component, required []*Component) (result []*Component) { + for _, s := range sorted { + for _, r := range required { + if r.target == s.target { + result = append(result, r) + break + } + } + } + return result +}