Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
Expand Down Expand Up @@ -56,6 +57,7 @@ type launcher struct {
don2donSharedPeer p2ptypes.SharedPeer
p2pStreamConfig p2ptypes.StreamConfig
metrics *launcherMetrics
localCapMgr localcapmgr.LocalCapabilityManager
}

// For V2 capabilities, shims are created once and their config is updated dynamically.
Expand Down Expand Up @@ -244,6 +246,11 @@ func (w *launcher) Close() error {
return nil
}

// LocalCapabilityManager is initialized after the Launcher is created
func (w *launcher) SetLocalCapabilityManager(lcm localcapmgr.LocalCapabilityManager) {
w.localCapMgr = lcm
}

func (w *launcher) Ready() error {
return nil
}
Expand Down Expand Up @@ -365,6 +372,16 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
w.lggr.Debug("My node doesn't belong to any DON families. No filtering will be applied.")
}

// Reconcile local capabilities: start/stop/restart capabilities based on registry state.
if w.localCapMgr != nil {
myDONs := make([]registrysyncer.DON, 0, len(myCapabilityDONs)+len(myWorkflowDONs))
myDONs = append(myDONs, myCapabilityDONs...)
myDONs = append(myDONs, myWorkflowDONs...)
if err := w.localCapMgr.Reconcile(ctx, myDONs); err != nil {
w.lggr.Errorw("Failed to reconcile local capabilities", "error", err)
}
}

belongsToAWorkflowDON := len(myWorkflowDONs) > 0
if belongsToAWorkflowDON {
myDON := myWorkflowDONs[0]
Expand Down
117 changes: 117 additions & 0 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,120 @@ func addCapabilityToDON(registry *registrysyncer.LocalRegistry, donID uint32, ca
CapabilityType: capabilityType,
}
}

func TestLauncher_OnNewRegistry_CallsLocalCapabilityManagerReconcile(t *testing.T) {
lggr := logger.Test(t)
registry := NewRegistry(lggr)
dispatcher := remoteMocks.NewDispatcher(t)

capabilityDonNodes := newNodes(4)
peer := mocks.NewPeer(t)
peer.On("UpdateConnections", mock.Anything).Return(nil)
peer.On("ID").Return(capabilityDonNodes[0])
peer.On("IsBootstrap").Return(false)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)

fullTriggerCapID := "streams-trigger@1.0.0"
mt := newMockTrigger(capabilities.MustNewCapabilityInfo(
fullTriggerCapID,
capabilities.CapabilityTypeTrigger,
"streams trigger",
))
require.NoError(t, registry.Add(t.Context(), mt))

triggerCapIDHash := RandomUTF8BytesWord()
capDonID := uint32(1)

localRegistry := buildLocalRegistry()
addDON(localRegistry, capDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, []string{"zone-a"}, 1, [][32]byte{triggerCapIDHash})
addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)

reconcileCalled := make(chan struct{}, 1)
mockLCM := &mockLocalCapabilityManager{
reconcileFn: func(ctx context.Context, dons []registrysyncer.DON) error {
assert.Len(t, dons, 1, "should pass all DONs")
assert.Equal(t, capDonID, dons[0].ID)
reconcileCalled <- struct{}{}
return nil
},
}

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.Anything).Return(nil)

launcher, err := NewLauncher(
lggr,
wrapper,
nil,
nil,
dispatcher,
registry,
&mockDonNotifier{},
)
require.NoError(t, err)
launcher.SetLocalCapabilityManager(mockLCM)
require.NoError(t, launcher.Start(t.Context()))
defer launcher.Close()

err = launcher.OnNewRegistry(t.Context(), localRegistry)
require.NoError(t, err)

select {
case <-reconcileCalled:
// success
default:
t.Fatal("Reconcile was not called on LocalCapabilityManager")
}
}

func TestLauncher_OnNewRegistry_NilLocalCapabilityManager(t *testing.T) {
lggr := logger.Test(t)
registry := NewRegistry(lggr)
dispatcher := remoteMocks.NewDispatcher(t)

nodes := newNodes(4)
peer := mocks.NewPeer(t)
peer.On("UpdateConnections", mock.Anything).Return(nil)
peer.On("ID").Return(nodes[0])
peer.On("IsBootstrap").Return(false)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)

localRegistry := buildLocalRegistry()
dID := uint32(1)
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, []string{"zone-a"}, 1, nil)

// Don't set localCapMgr - should not panic.
launcher, err := NewLauncher(
lggr,
wrapper,
nil,
nil,
dispatcher,
registry,
&mockDonNotifier{},
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
defer launcher.Close()

err = launcher.OnNewRegistry(t.Context(), localRegistry)
require.NoError(t, err)
}

// mockLocalCapabilityManager is a test mock that records calls to Reconcile.
type mockLocalCapabilityManager struct {
reconcileFn func(ctx context.Context, dons []registrysyncer.DON) error
}

func (m *mockLocalCapabilityManager) Start(context.Context) error { return nil }
func (m *mockLocalCapabilityManager) Close() error { return nil }
func (m *mockLocalCapabilityManager) Ready() error { return nil }
func (m *mockLocalCapabilityManager) HealthReport() map[string]error { return nil }
func (m *mockLocalCapabilityManager) Name() string { return "mockLocalCapMgr" }
func (m *mockLocalCapabilityManager) Reconcile(ctx context.Context, dons []registrysyncer.DON) error {
if m.reconcileFn != nil {
return m.reconcileFn(ctx, dons)
}
return nil
}
Loading
Loading