From 15fab1daa84348e96a7895280209cb73e456a1c5 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Wed, 8 May 2024 14:42:36 -0500 Subject: [PATCH] core/services: fix ocrWrapper saveError contexts (#13139) --- .changeset/big-trees-help.md | 5 ++ core/services/ocr/delegate.go | 4 +- core/services/ocr2/delegate.go | 90 +++++++++++++++++--------- core/services/ocrbootstrap/delegate.go | 18 +++--- core/services/ocrcommon/ocr_logger.go | 30 +++++++++ 5 files changed, 104 insertions(+), 43 deletions(-) create mode 100644 .changeset/big-trees-help.md create mode 100644 core/services/ocrcommon/ocr_logger.go diff --git a/.changeset/big-trees-help.md b/.changeset/big-trees-help.md new file mode 100644 index 00000000000..f826d257afa --- /dev/null +++ b/.changeset/big-trees-help.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +core/services: fix ocrWrapper saveError contexts #internal diff --git a/core/services/ocr/delegate.go b/core/services/ocr/delegate.go index 690e9ad7c71..e748823ad71 100644 --- a/core/services/ocr/delegate.go +++ b/core/services/ocr/delegate.go @@ -14,7 +14,6 @@ import ( ocr "github.com/smartcontractkit/libocr/offchainreporting" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types" - commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" @@ -155,9 +154,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] v2Bootstrappers = peerWrapper.P2PConfig().V2().DefaultBootstrappers() } - ocrLogger := commonlogger.NewOCRWrapper(lggr, d.cfg.OCR().TraceLogging(), func(msg string) { + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR().TraceLogging(), func(ctx context.Context, msg string) { d.jobORM.TryRecordError(ctx, jb.ID, msg) }) + services = append(services, ocrLogger) lc := toLocalConfig(chain.Config().EVM(), chain.Config().EVM().OCR(), d.cfg.Insecure(), *concreteSpec, d.cfg.OCR()) if err = ocr.SanityCheckLocalConfig(lc); err != nil { diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 1addae25601..e8db1cfe2b3 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -25,7 +25,6 @@ import ( ocr2keepers20runner "github.com/smartcontractkit/chainlink-automation/pkg/v2/runner" ocr2keepers21config "github.com/smartcontractkit/chainlink-automation/pkg/v3/config" ocr2keepers21 "github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin" - commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins/ocr3" @@ -397,10 +396,6 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi return nil, errors.New("peerWrapper is not started. OCR2 jobs require a started and running p2p v2 peer") } - ocrLogger := commonlogger.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(msg string) { - lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") - }) - lc, err := validate.ToLocalConfig(d.cfg.OCR2(), d.cfg.Insecure(), *spec) if err != nil { return nil, err @@ -438,22 +433,22 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi ctx = lggrCtx.ContextWithValues(ctx) switch spec.PluginType { case types.Mercury: - return d.newServicesMercury(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + return d.newServicesMercury(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc) case types.LLO: - return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc) case types.Median: - return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc, ocrLogger) + return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc) case types.DKG: - return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc) case types.OCR2VRF: return d.newServicesOCR2VRF(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc) case types.OCR2Keeper: - return d.newServicesOCR2Keepers(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + return d.newServicesOCR2Keepers(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc) case types.Functions: const ( @@ -463,10 +458,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi ) thresholdPluginDB := NewDB(d.ds, spec.ID, thresholdPluginId, lggr) s4PluginDB := NewDB(d.ds, spec.ID, s4PluginId, lggr) - return d.newServicesOCR2Functions(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger) + return d.newServicesOCR2Functions(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc) case types.GenericPlugin: - return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.capabilitiesRegistry, + return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, d.capabilitiesRegistry, kvStore) default: @@ -525,7 +520,6 @@ func (d *Delegate) newServicesGenericPlugin( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, capabilitiesRegistry core.CapabilitiesRegistry, keyValueStore core.KeyValueStore, ) (srvs []job.ServiceCtx, err error) { @@ -655,6 +649,11 @@ func (d *Delegate) newServicesGenericPlugin( synchronization.TelemetryType(pCfg.TelemetryType), ) + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + srvs = append(srvs, ocrLogger) + switch pCfg.OCRVersion { case 2: plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, @@ -723,7 +722,6 @@ func (d *Delegate) newServicesMercury( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { if jb.OCR2OracleSpec.FeedID == nil || (*jb.OCR2OracleSpec.FeedID == (common.Hash{})) { return nil, errors.Errorf("ServicesForSpec: mercury job type requires feedID") @@ -775,6 +773,10 @@ func (d *Delegate) newServicesMercury( // https://smartcontract-it.atlassian.net/browse/MERC-3386 lc.ContractConfigTrackerPollInterval = 1 * time.Second // Mercury requires a fast poll interval, this is the fastest that libocr supports. See: https://github.com/smartcontractkit/offchain-reporting/pull/520 + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + oracleArgsNoPlugin := libocr2.MercuryOracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -803,6 +805,8 @@ func (d *Delegate) newServicesMercury( lggr.Infow("Enhanced telemetry is disabled for mercury job", "job", jb.Name) } + mercuryServices = append(mercuryServices, ocrLogger) + return mercuryServices, err2 } @@ -814,7 +818,6 @@ func (d *Delegate) newServicesLLO( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { lggr = logger.Sugared(lggr.Named("LLO")) spec := jb.OCR2OracleSpec @@ -906,6 +909,10 @@ func (d *Delegate) newServicesLLO( lggr.Infof("Using on-chain signing keys for LLO job %d (%s): %v", jb.ID, jb.Name.ValueOrZero(), kbm) kr := llo.NewOnchainKeyring(lggr, kbm) + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + cfg := llo.DelegateConfig{ Logger: lggr, DataSource: d.ds, @@ -934,7 +941,7 @@ func (d *Delegate) newServicesLLO( if err != nil { return nil, err } - return []job.ServiceCtx{provider, oracle}, nil + return []job.ServiceCtx{provider, ocrLogger, oracle}, nil } func (d *Delegate) newServicesMedian( @@ -946,7 +953,6 @@ func (d *Delegate) newServicesMedian( kvStore job.KVStore, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec @@ -955,6 +961,10 @@ func (d *Delegate) newServicesMedian( return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "median"} } + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + oracleArgsNoPlugin := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -988,6 +998,8 @@ func (d *Delegate) newServicesMedian( lggr.Infow("Enhanced telemetry is disabled for job", "job", jb.Name) } + medianServices = append(medianServices, ocrLogger) + return medianServices, err2 } @@ -998,7 +1010,6 @@ func (d *Delegate) newServicesDKG( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec rid, err := spec.RelayID() @@ -1028,6 +1039,9 @@ func (d *Delegate) newServicesDKG( if err2 != nil { return nil, err2 } + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) noopMonitoringEndpoint := telemetry.NoopAgent{} oracleArgsNoPlugin := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, @@ -1044,7 +1058,12 @@ func (d *Delegate) newServicesDKG( OnchainKeyring: kb, MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer), } - return dkg.NewDKGServices(jb, dkgProvider, lggr, ocrLogger, d.dkgSignKs, d.dkgEncryptKs, chain.Client(), oracleArgsNoPlugin, d.ds, chain.ID(), spec.Relay) + services, err := dkg.NewDKGServices(jb, dkgProvider, lggr, ocrLogger, d.dkgSignKs, d.dkgEncryptKs, chain.Client(), oracleArgsNoPlugin, d.ds, chain.ID(), spec.Relay) + if err != nil { + return nil, err + } + services = append(services, ocrLogger) + return services, nil } func (d *Delegate) newServicesOCR2VRF( @@ -1167,12 +1186,10 @@ func (d *Delegate) newServicesOCR2VRF( "jobName", jb.Name.ValueOrZero(), "jobID", jb.ID, ) - vrfLogger := commonlogger.NewOCRWrapper(l.With( - "vrfContractID", spec.ContractID), d.cfg.OCR2().TraceLogging(), func(msg string) { + vrfLogger := ocrcommon.NewOCRWrapper(l.With("vrfContractID", spec.ContractID), d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") }) - dkgLogger := commonlogger.NewOCRWrapper(l.With( - "dkgContractID", cfg.DKGContractAddress), d.cfg.OCR2().TraceLogging(), func(msg string) { + dkgLogger := ocrcommon.NewOCRWrapper(l.With("dkgContractID", cfg.DKGContractAddress), d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") }) dkgReportingPluginFactoryDecorator := func(wrapped ocrtypes.ReportingPluginFactory) ocrtypes.ReportingPluginFactory { @@ -1233,7 +1250,6 @@ func (d *Delegate) newServicesOCR2Keepers( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec var cfg ocr2keeper.PluginConfig @@ -1247,14 +1263,14 @@ func (d *Delegate) newServicesOCR2Keepers( switch cfg.ContractVersion { case "v2.1": - return d.newServicesOCR2Keepers21(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec) + return d.newServicesOCR2Keepers21(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, cfg, spec) case "v2.1+": // Future contracts of v2.1 (v2.x) will use the same job spec as v2.1 - return d.newServicesOCR2Keepers21(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec) + return d.newServicesOCR2Keepers21(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, cfg, spec) case "v2.0": - return d.newServicesOCR2Keepers20(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec) + return d.newServicesOCR2Keepers20(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, cfg, spec) default: - return d.newServicesOCR2Keepers20(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec) + return d.newServicesOCR2Keepers20(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, cfg, spec) } } @@ -1266,7 +1282,6 @@ func (d *Delegate) newServicesOCR2Keepers21( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, cfg ocr2keeper.PluginConfig, spec *job.OCR2OracleSpec, ) ([]job.ServiceCtx, error) { @@ -1348,6 +1363,9 @@ func (d *Delegate) newServicesOCR2Keepers21( if cfg.ServiceQueueLength != 0 { conf.ServiceQueueLength = cfg.ServiceQueueLength } + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) dConf := ocr2keepers21.DelegateConfig{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, @@ -1394,6 +1412,7 @@ func (d *Delegate) newServicesOCR2Keepers21( keeperProvider.UpkeepStateStore(), keeperProvider.TransmitEventProvider(), pluginService, + ocrLogger, } if cfg.CaptureAutomationCustomTelemetry != nil && *cfg.CaptureAutomationCustomTelemetry || @@ -1422,7 +1441,6 @@ func (d *Delegate) newServicesOCR2Keepers20( kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, cfg ocr2keeper.PluginConfig, spec *job.OCR2OracleSpec, ) ([]job.ServiceCtx, error) { @@ -1498,6 +1516,10 @@ func (d *Delegate) newServicesOCR2Keepers20( CacheClean: conf.CacheEvictionInterval, } + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + dConf := ocr2keepers20.DelegateConfig{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -1532,6 +1554,7 @@ func (d *Delegate) newServicesOCR2Keepers20( keeperProvider, rgstry, logProvider, + ocrLogger, pluginService, }, nil } @@ -1546,7 +1569,6 @@ func (d *Delegate) newServicesOCR2Functions( thresholdOcrDB *db, s4OcrDB *db, lc ocrtypes.LocalConfig, - ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec @@ -1597,6 +1619,10 @@ func (d *Delegate) newServicesOCR2Functions( return nil, err } + ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) { + lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) + functionsOracleArgs := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, @@ -1682,7 +1708,7 @@ func (d *Delegate) newServicesOCR2Functions( return nil, errors.Wrap(err, "error calling NewFunctionsServices") } - return append([]job.ServiceCtx{functionsProvider, thresholdProvider, s4Provider}, functionsServices...), nil + return append([]job.ServiceCtx{functionsProvider, thresholdProvider, s4Provider, ocrLogger}, functionsServices...), nil } // errorLog implements [loop.ErrorLog] diff --git a/core/services/ocrbootstrap/delegate.go b/core/services/ocrbootstrap/delegate.go index 4f927faa009..fdcb68ceecc 100644 --- a/core/services/ocrbootstrap/delegate.go +++ b/core/services/ocrbootstrap/delegate.go @@ -9,7 +9,6 @@ import ( ocr "github.com/smartcontractkit/libocr/offchainreporting2plus" - commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -163,14 +162,15 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] "ContractTransmitterTransmitTimeout", lc.ContractTransmitterTransmitTimeout, "DatabaseTimeout", lc.DatabaseTimeout, ) + ocrLogger := ocrcommon.NewOCRWrapper(lggr.Named("OCRBootstrap"), d.ocr2Cfg.TraceLogging(), func(ctx context.Context, msg string) { + logger.Sugared(lggr).ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") + }) bootstrapNodeArgs := ocr.BootstrapperArgs{ - BootstrapperFactory: d.peerWrapper.Peer2, - ContractConfigTracker: configProvider.ContractConfigTracker(), - Database: NewDB(d.ds, spec.ID, lggr), - LocalConfig: lc, - Logger: commonlogger.NewOCRWrapper(lggr.Named("OCRBootstrap"), d.ocr2Cfg.TraceLogging(), func(msg string) { - logger.Sugared(lggr).ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error") - }), + BootstrapperFactory: d.peerWrapper.Peer2, + ContractConfigTracker: configProvider.ContractConfigTracker(), + Database: NewDB(d.ds, spec.ID, lggr), + LocalConfig: lc, + Logger: ocrLogger, OffchainConfigDigester: configProvider.OffchainConfigDigester(), } lggr.Debugw("Launching new bootstrap node", "args", bootstrapNodeArgs) @@ -178,7 +178,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] if err != nil { return nil, errors.Wrap(err, "error calling NewBootstrapNode") } - return []job.ServiceCtx{configProvider, job.NewServiceAdapter(bootstrapper)}, nil + return []job.ServiceCtx{configProvider, ocrLogger, job.NewServiceAdapter(bootstrapper)}, nil } // AfterJobCreated satisfies the job.Delegate interface. diff --git a/core/services/ocrcommon/ocr_logger.go b/core/services/ocrcommon/ocr_logger.go new file mode 100644 index 00000000000..50a8c9adc71 --- /dev/null +++ b/core/services/ocrcommon/ocr_logger.go @@ -0,0 +1,30 @@ +package ocrcommon + +import ( + "context" + + ocrtypes "github.com/smartcontractkit/libocr/commontypes" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +type ocrLoggerService struct { + stopCh services.StopChan + ocrtypes.Logger +} + +func NewOCRWrapper(l logger.Logger, trace bool, saveError func(context.Context, string)) *ocrLoggerService { + stopCh := make(services.StopChan) + return &ocrLoggerService{ + stopCh: stopCh, + Logger: logger.NewOCRWrapper(l, trace, func(s string) { + ctx, cancel := stopCh.NewCtx() + defer cancel() + saveError(ctx, s) + }), + } +} + +func (*ocrLoggerService) Start(context.Context) error { return nil } +func (s *ocrLoggerService) Close() error { close(s.stopCh); return nil }