-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
63 lines (53 loc) · 1.55 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright (C) 2022, Chain4Travel AG. All rights reserved.
//
// This file is a derived work, based on ava-labs code whose
// original notices appear below.
//
// It is distributed under the same license conditions as the
// original code from which it is derived.
//
// Much love to the original authors for their work.
// **********************************************************
// (c) 2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package stream
import (
"context"
"errors"
"time"
"github.com/lasthyphen/mages/cfg"
"github.com/lasthyphen/mages/db"
"github.com/lasthyphen/mages/servicesctrl"
"github.com/lasthyphen/mages/utils"
)
var (
processorFailureRetryInterval = 200 * time.Millisecond
// ErrNoMessage is no message
ErrNoMessage = errors.New("no message")
)
type (
ProcessorFactoryChainDB func(*servicesctrl.Control, cfg.Config, string, string) (ProcessorDB, error)
ProcessorFactoryInstDB func(*servicesctrl.Control, cfg.Config) (ProcessorDB, error)
)
type ProcessorDB interface {
Process(*utils.Connections, *db.TxPool) error
Close() error
ID() string
Topic() []string
}
func UpdateTxPool(
ctxTimeout time.Duration,
conns *utils.Connections,
persist db.Persist,
txPool *db.TxPool,
sc *servicesctrl.Control,
) error {
sess := conns.DB().NewSessionForEventReceiver(conns.Stream().NewJob("update-tx-pool"))
ctx, cancelCtx := context.WithTimeout(context.Background(), ctxTimeout)
defer cancelCtx()
err := persist.InsertTxPool(ctx, sess, txPool)
if err == nil {
sc.Enqueue(txPool)
}
return err
}