-
Notifications
You must be signed in to change notification settings - Fork 0
/
chprovider.go
135 lines (114 loc) · 3.98 KB
/
chprovider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package chpvdr
import (
reqContext "context"
"github.com/vtbaas/vbaas-go-sdk/pkg/common/logging"
"github.com/vtbaas/vbaas-go-sdk/pkg/common/options"
"github.com/vtbaas/vbaas-go-sdk/pkg/common/providers/context"
"github.com/vtbaas/vbaas-go-sdk/pkg/common/providers/fab"
channelImpl "github.com/vtbaas/vbaas-go-sdk/pkg/fab/channel"
"github.com/vtbaas/vbaas-go-sdk/pkg/fab/chconfig"
"github.com/vtbaas/vbaas-go-sdk/pkg/util/concurrent/lazycache"
)
var logger = logging.NewLogger("fabsdk")
// ChannelProvider keeps context across ChannelService instances.
//
// TODO: add listener for channel config changes. Upon channel config change,
// underlying channel services need to recreate their channel clients.
type ChannelProvider struct {
providerContext context.Providers
ctxtCaches *lazycache.Cache
}
// New creates a ChannelProvider based on a context
func New(config fab.EndpointConfig, opts ...options.Opt) (*ChannelProvider, error) {
return &ChannelProvider{
ctxtCaches: lazycache.New(
"Client_Context_Cache",
func(key lazycache.Key) (interface{}, error) {
ck := key.(*ctxtCacheKey)
return newContextCache(ck.context, opts), nil
},
),
}, nil
}
// Initialize sets the provider context
func (cp *ChannelProvider) Initialize(providers context.Providers) error {
cp.providerContext = providers
return nil
}
// Close frees resources and caches.
func (cp *ChannelProvider) Close() {
cp.ctxtCaches.Close()
}
// CloseContext frees resources and caches for the given context.
func (cp *ChannelProvider) CloseContext(ctx fab.ClientContext) {
key, err := newCtxtCacheKey(ctx)
if err != nil {
logger.Warnf("Unable to close context: %s", err)
return
}
logger.Warnf("Deleting context cache...")
cp.ctxtCaches.Delete(key)
}
// ChannelService creates a ChannelService for an identity
func (cp *ChannelProvider) ChannelService(ctx fab.ClientContext, channelID string) (fab.ChannelService, error) {
key, err := newCtxtCacheKey(ctx)
if err != nil {
return nil, err
}
ctxtCache, err := cp.ctxtCaches.Get(key)
if err != nil {
// This should never happen since the creation of a cache never returns an error
return nil, err
}
return &ChannelService{
provider: cp,
context: ctx,
channelID: channelID,
ctxtCache: ctxtCache.(*contextCache),
}, nil
}
// ChannelService provides Channel clients and maintains contexts for them.
// the identity context is used
type ChannelService struct {
provider *ChannelProvider
context context.Client
channelID string
ctxtCache *contextCache
}
// Config returns the Config for the named channel
func (cs *ChannelService) Config() (fab.ChannelConfig, error) {
return chconfig.New(cs.channelID)
}
// EventService returns the EventService.
func (cs *ChannelService) EventService(opts ...options.Opt) (fab.EventService, error) {
return cs.ctxtCache.GetEventService(cs.channelID, opts...)
}
// Membership returns and caches a channel member identifier
// A membership reference is returned that refreshes with the configured interval
func (cs *ChannelService) Membership() (fab.ChannelMembership, error) {
return cs.ctxtCache.GetMembership(cs.channelID)
}
// ChannelConfig returns the channel config for this channel
func (cs *ChannelService) ChannelConfig() (fab.ChannelCfg, error) {
return cs.ctxtCache.GetChannelConfig(cs.channelID)
}
// Transactor returns the transactor
func (cs *ChannelService) Transactor(reqCtx reqContext.Context) (fab.Transactor, error) {
cfg, err := cs.ChannelConfig()
if err != nil {
return nil, err
}
return channelImpl.NewTransactor(reqCtx, cfg)
}
// Discovery returns a DiscoveryService for the given channel
func (cs *ChannelService) Discovery() (fab.DiscoveryService, error) {
return cs.ctxtCache.GetDiscoveryService(cs.channelID)
}
// Selection returns a SelectionService for the given channel
func (cs *ChannelService) Selection() (fab.SelectionService, error) {
return cs.ctxtCache.GetSelectionService(cs.channelID)
}