Skip to content

Commit

Permalink
[KS-193] Pass MercuryTriggerService to Mercury Transmitter (#13118)
Browse files Browse the repository at this point in the history
1. Add EnableTriggerCapability flag to Relay config
2. Create MercuryTriggerService lazily, on the first call to NewMercuryProvider()
3. Make it available in the Transmitter (no-op for now)
  • Loading branch information
bolekk committed May 9, 2024
1 parent 579ab3d commit 6008d73
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .changeset/great-kids-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Pass MercuryTriggerService to Mercury Transmitter
27 changes: 22 additions & 5 deletions core/capabilities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,38 @@ func (s *registrySyncer) Start(ctx context.Context) error {
}
if slices.Contains(triggerDONPeers, myId) {
s.lggr.Info("member of a capability DON - starting remote publishers")

{
// ---- This is for local tests only, until a full-blown Syncer is implemented
// ---- Normally this is set up asynchronously (by the Relayer + job specs in Mercury's case)
localTrigger := triggers.NewMercuryTriggerService(1000, s.lggr)
mockMercuryDataProducer := NewMockMercuryDataProducer(localTrigger, s.lggr)
err = s.registry.Add(ctx, localTrigger)
if err != nil {
s.lggr.Errorw("failed to add local trigger capability to registry", "error", err)
return err
}
s.subServices = append(s.subServices, localTrigger)
s.subServices = append(s.subServices, mockMercuryDataProducer)
// ----
}

underlying, err2 := s.registry.GetTrigger(ctx, capId)
if err2 != nil {
// NOTE: it's possible that the jobs are not launched yet at this moment.
// If not found yet, Syncer won't add to Registry but retry on the next tick.
return err2
}
workflowDONs := map[string]capabilities.DON{
workflowDonInfo.ID: workflowDonInfo,
}
underlying := triggers.NewMercuryTriggerService(1000, s.lggr)
triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, triggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr)
err = s.dispatcher.SetReceiver(capId, triggerCapabilityDonInfo.ID, triggerCap)
if err != nil {
s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", triggerCapabilityDonInfo.ID, "error", err)
return err
}
s.subServices = append(s.subServices, underlying)
s.subServices = append(s.subServices, triggerCap)
// NOTE: temporary mock Mercury data producer
mockMercuryDataProducer := NewMockMercuryDataProducer(underlying, s.lggr)
s.subServices = append(s.subServices, mockMercuryDataProducer)
}
// NOTE: temporary service start - should be managed by capability creation
for _, srv := range s.subServices {
Expand Down
26 changes: 23 additions & 3 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

ocr3capability "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
Expand Down Expand Up @@ -82,8 +83,9 @@ type Relayer struct {
capabilitiesRegistry coretypes.CapabilitiesRegistry

// Mercury
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig
mercuryORM mercury.ORM
transmitterCfg mercury.TransmitterConfig
triggerCapability *triggers.MercuryTriggerService

// LLO/data streams
cdcFactory llo.ChannelDefinitionCacheFactory
Expand Down Expand Up @@ -154,6 +156,9 @@ func (r *Relayer) Start(context.Context) error {
}

func (r *Relayer) Close() error {
if r.triggerCapability != nil {
return r.triggerCapability.Close()
}
return nil
}

Expand Down Expand Up @@ -255,6 +260,21 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
clients[server.URL] = client
}

// initialize trigger capability service lazily
if relayConfig.EnableTriggerCapability && r.triggerCapability == nil {
if r.capabilitiesRegistry == nil {
lggr.Errorw("trigger capability is enabled but capabilities registry is not set")
} else {
r.triggerCapability = triggers.NewMercuryTriggerService(0, lggr)
if err := r.triggerCapability.Start(ctx); err != nil {
return nil, err
}
if err := r.capabilitiesRegistry.Add(ctx, r.triggerCapability); err != nil {
return nil, err
}
}
}

// FIXME: We actually know the version here since it's in the feed ID, can
// we use generics to avoid passing three of this?
// https://smartcontract-it.atlassian.net/browse/MERC-1414
Expand All @@ -273,7 +293,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec)
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, r.triggerCapability)

return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
}
Expand Down
8 changes: 6 additions & 2 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"
Expand Down Expand Up @@ -111,7 +112,8 @@ type mercuryTransmitter struct {

servers map[string]*server

codec TransmitterReportDecoder
codec TransmitterReportDecoder
triggerCapability *triggers.MercuryTriggerService

feedID mercuryutils.FeedID
jobID int32
Expand Down Expand Up @@ -275,7 +277,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
Expand All @@ -302,6 +304,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
cfg,
servers,
codec,
triggerCapability,
feedID,
jobID,
fmt.Sprintf("%x", fromAccount),
Expand Down Expand Up @@ -388,6 +391,7 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R
vs[i] = v
}
rawReportCtx := evmutil.RawReportContext(reportCtx)
// TODO(KS-194): send report to mt.triggerCapability.ProcessReport()

payload, err := PayloadTypes.Pack(rawReportCtx, []byte(report), rs, ss, vs)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions core/services/relay/evm/mercury/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV1Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -62,7 +62,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV2Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -76,7 +76,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV3Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -95,7 +95,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
clients[sURL2] = c
clients[sURL3] = c

mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
mt.servers[sURL2].q.Init([]*Transmission{})
Expand Down Expand Up @@ -137,7 +137,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -153,7 +153,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -167,7 +167,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
_, err := mt.LatestTimestamp(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand Down Expand Up @@ -197,7 +197,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
return out, nil
},
}
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand Down Expand Up @@ -240,7 +240,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)

t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) {
codec.val = originalPrice
Expand Down Expand Up @@ -271,7 +271,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.NoError(t, err)

Expand All @@ -285,7 +285,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
_, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand Down Expand Up @@ -315,7 +315,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -331,7 +331,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -344,7 +344,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand All @@ -362,7 +362,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x")
Expand Down
3 changes: 2 additions & 1 deletion core/services/relay/evm/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ type RelayConfig struct {
SendingKeys pq.StringArray `json:"sendingKeys"`

// Mercury-specific
FeedID *common.Hash `json:"feedID"`
FeedID *common.Hash `json:"feedID"`
EnableTriggerCapability bool `json:"enableTriggerCapability"`
}

var ErrBadRelayConfig = errors.New("bad relay config")
Expand Down

0 comments on commit 6008d73

Please sign in to comment.