forked from rollkit/cometbft
/
client.go
134 lines (116 loc) · 4.16 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package proxy
import (
"fmt"
abcicli "github.com/cometbft/cometbft/abci/client"
"github.com/cometbft/cometbft/abci/example/kvstore"
"github.com/cometbft/cometbft/abci/types"
cmtsync "github.com/cometbft/cometbft/libs/sync"
e2e "github.com/cometbft/cometbft/test/e2e/app"
)
//go:generate ../scripts/mockery_generate.sh ClientCreator
// ClientCreator creates new ABCI clients.
type ClientCreator interface {
// NewABCIClient returns a new ABCI client.
NewABCIClient() (abcicli.Client, error)
}
//----------------------------------------------------
// local proxy uses a mutex on an in-proc app
type localClientCreator struct {
mtx *cmtsync.Mutex
app types.Application
}
// NewLocalClientCreator returns a [ClientCreator] for the given app, which
// will be running locally.
//
// Maintains a single mutex over all new clients created with NewABCIClient.
// For a local client creator that uses a single mutex per new client, rather
// use [NewUnsyncLocalClientCreator].
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(cmtsync.Mutex),
app: app,
}
}
func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewLocalClient(l.mtx, l.app), nil
}
//----------------------------------------------------
// local proxy creates a new mutex for each client
type unsyncLocalClientCreator struct {
app types.Application
}
// NewUnsyncLocalClientCreator returns a [ClientCreator] for the given app.
// Unlike [NewLocalClientCreator], each call to NewABCIClient returns an ABCI
// client that maintains its own mutex over the application.
func NewUnsyncLocalClientCreator(app types.Application) ClientCreator {
return &unsyncLocalClientCreator{
app: app,
}
}
func (c *unsyncLocalClientCreator) NewABCIClient() (abcicli.Client, error) {
// Specifying nil for the mutex causes each instance to create its own
// mutex.
return abcicli.NewLocalClient(nil, c.app), nil
}
//---------------------------------------------------------------
// remote proxy opens new connections to an external app process
type remoteClientCreator struct {
addr string
transport string
mustConnect bool
}
// NewRemoteClientCreator returns a ClientCreator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator {
return &remoteClientCreator{
addr: addr,
transport: transport,
mustConnect: mustConnect,
}
}
func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) {
remoteApp, err := abcicli.NewClient(r.addr, r.transport, r.mustConnect)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
}
return remoteApp, nil
}
// DefaultClientCreator returns a default [ClientCreator], which will create a
// local client if addr is one of "kvstore", "persistent_kvstore", "e2e",
// "noop".
//
// Otherwise a remote client will be created.
//
// Each of "kvstore", "persistent_kvstore" and "e2e" also currently have an
// "_unsync" variant (i.e. "kvstore_unsync", etc.), which attempts to replicate
// the same concurrency model as the remote client.
func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
switch addr {
case "kvstore":
return NewLocalClientCreator(kvstore.NewInMemoryApplication())
case "kvstore_unsync":
return NewUnsyncLocalClientCreator(kvstore.NewInMemoryApplication())
case "persistent_kvstore":
return NewLocalClientCreator(kvstore.NewPersistentApplication(dbDir))
case "persistent_kvstore_unsync":
return NewUnsyncLocalClientCreator(kvstore.NewPersistentApplication(dbDir))
case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
}
return NewLocalClientCreator(app)
case "e2e_unsync":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
}
return NewUnsyncLocalClientCreator(app)
case "noop":
return NewLocalClientCreator(types.NewBaseApplication())
default:
mustConnect := false // loop retrying
return NewRemoteClientCreator(addr, transport, mustConnect)
}
}