Skip to content

Commit

Permalink
[KS-90] Add database-backed store (#13045)
Browse files Browse the repository at this point in the history
* [KS-90] Add database-backed store

* [KS-90] Add database-backed store
  • Loading branch information
cedric-cordenier committed May 6, 2024
1 parent 09f8c7f commit 0955d46
Show file tree
Hide file tree
Showing 14 changed files with 1,249 additions and 306 deletions.
4 changes: 4 additions & 0 deletions core/services/chainlink/application.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/google/uuid"
"github.com/grafana/pyroscope-go"
"github.com/jonboulle/clockwork"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -319,6 +321,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
jobORM = job.NewORM(opts.DS, pipelineORM, bridgeORM, keyStore, globalLogger)
txmORM = txmgr.NewTxStore(opts.DS, globalLogger)
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
workflowORM = workflowstore.NewDBStore(opts.DS, clockwork.NewRealClock())
)

for _, chain := range legacyEVMChains.Slice() {
Expand Down Expand Up @@ -388,6 +391,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger,
registry,
legacyEVMChains,
workflowORM,
func() *p2ptypes.PeerID {
if externalPeerWrapper == nil {
return nil
Expand Down
7 changes: 5 additions & 2 deletions core/services/workflows/delegate.go
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

type Delegate struct {
registry core.CapabilitiesRegistry
logger logger.Logger
legacyEVMChains legacyevm.LegacyChainContainer
peerID func() *p2ptypes.PeerID
store store.Store
}

var _ job.Delegate = (*Delegate)(nil)
Expand Down Expand Up @@ -58,6 +60,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
Registry: d.registry,
DONInfo: dinfo,
PeerID: d.peerID,
Store: d.store,
}
engine, err := NewEngine(cfg)
if err != nil {
Expand Down Expand Up @@ -103,8 +106,8 @@ func initializeDONInfo(lggr logger.Logger) (*capabilities.DON, error) {
}, nil
}

func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, peerID func() *p2ptypes.PeerID) *Delegate {
return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains, peerID: peerID}
func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, store store.Store, peerID func() *p2ptypes.PeerID) *Delegate {
return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains, store: store, peerID: peerID}
}

func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
Expand Down

0 comments on commit 0955d46

Please sign in to comment.