-
Notifications
You must be signed in to change notification settings - Fork 17
/
observer.go
112 lines (86 loc) · 2.7 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package observer
import (
"github.com/trustbloc/logutil-go/pkg/log"
"github.com/trustbloc/sidetree-core-go/pkg/api/operation"
"github.com/trustbloc/sidetree-core-go/pkg/api/protocol"
"github.com/trustbloc/sidetree-core-go/pkg/api/txn"
logfields "github.com/trustbloc/sidetree-core-go/pkg/internal/log"
)
var logger = log.New("sidetree-core-observer")
// Ledger interface to access ledger txn.
type Ledger interface {
RegisterForSidetreeTxn() <-chan []txn.SidetreeTxn
}
// OperationStore interface to access operation store.
type OperationStore interface {
Put(ops []*operation.AnchoredOperation) error
}
// OperationFilter filters out operations before they are persisted.
type OperationFilter interface {
Filter(uniqueSuffix string, ops []*operation.AnchoredOperation) ([]*operation.AnchoredOperation, error)
}
// Providers contains all of the providers required by the TxnProcessor.
type Providers struct {
Ledger Ledger
ProtocolClientProvider protocol.ClientProvider
}
// Observer receives transactions over a channel and processes them by storing them to an operation store.
type Observer struct {
*Providers
stopCh chan struct{}
}
// New returns a new observer.
func New(providers *Providers) *Observer {
return &Observer{
Providers: providers,
stopCh: make(chan struct{}, 1),
}
}
// Start starts observer routines.
func (o *Observer) Start() {
go o.listen(o.Ledger.RegisterForSidetreeTxn())
}
// Stop stops the observer.
func (o *Observer) Stop() {
o.stopCh <- struct{}{}
}
func (o *Observer) listen(txnsCh <-chan []txn.SidetreeTxn) {
for {
select {
case <-o.stopCh:
logger.Info("The observer has been stopped. Exiting.")
return
case txns, ok := <-txnsCh:
if !ok {
logger.Warn("Notification channel was closed. Exiting.")
return
}
o.process(txns)
}
}
}
func (o *Observer) process(txns []txn.SidetreeTxn) {
for _, txn := range txns {
pc, err := o.ProtocolClientProvider.ForNamespace(txn.Namespace)
if err != nil {
logger.Warn("Failed to get protocol client for namespace", logfields.WithNamespace(txn.Namespace), log.WithError(err))
continue
}
v, err := pc.Get(txn.ProtocolVersion)
if err != nil {
logger.Warn("Failed to get processor for transaction time", logfields.WithGenesisTime(txn.ProtocolVersion),
log.WithError(err))
continue
}
_, err = v.TransactionProcessor().Process(txn)
if err != nil {
logger.Warn("Failed to process anchor", logfields.WithAnchorString(txn.AnchorString), log.WithError(err))
continue
}
logger.Debug("Successfully processed anchor", logfields.WithAnchorString(txn.AnchorString))
}
}