Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread contexts #36

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions internal/mock/shiroclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ type mockShiroClient struct {
shiroPhylum string
}

func (c *mockShiroClient) flatten(configs ...types.Config) (*plugin.ConcreteRequestOptions, error) {
ctx := context.TODO()
opt := types.ApplyConfigs(ctx, nil, append(c.baseConfig, configs...)...)
func (c *mockShiroClient) flatten(ctx context.Context, configs ...types.Config) (*plugin.ConcreteRequestOptions, error) {
opt := types.ApplyConfigs(nil, append(c.baseConfig, configs...)...)

params, err := json.Marshal(opt.Params)
if err != nil {
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *mockShiroClient) flatten(configs ...types.Config) (*plugin.ConcreteRequ
AuthToken: opt.AuthToken,
Params: params,
Transient: opt.Transient,
Timestamp: tsg(opt.Ctx, opt.TimestampGenerator),
Timestamp: tsg(ctx, opt.TimestampGenerator),
MSPFilter: opt.MspFilter,
MinEndorsers: opt.MinEndorsers,
Creator: opt.Creator,
Expand All @@ -83,18 +82,18 @@ func (c *mockShiroClient) flatten(configs ...types.Config) (*plugin.ConcreteRequ
}

// Seed implements the ShiroClient interface.
func (c *mockShiroClient) Seed(version string, configs ...types.Config) error {
func (c *mockShiroClient) Seed(_ context.Context, version string, configs ...types.Config) error {
return fmt.Errorf("Seed(...) is not supported")
}

// ShiroPhylum implements the ShiroClient interface.
func (c *mockShiroClient) ShiroPhylum(configs ...types.Config) (string, error) {
func (c *mockShiroClient) ShiroPhylum(_ context.Context, configs ...types.Config) (string, error) {
return c.shiroPhylum, nil
}

// Init implements the ShiroClient interface.
func (c *mockShiroClient) Init(phylum string, configs ...types.Config) error {
cro, err := c.flatten(configs...)
func (c *mockShiroClient) Init(ctx context.Context, phylum string, configs ...types.Config) error {
cro, err := c.flatten(ctx, configs...)
if err != nil {
return err
}
Expand All @@ -103,7 +102,7 @@ func (c *mockShiroClient) Init(phylum string, configs ...types.Config) error {

// Call implements the ShiroClient interface.
func (c *mockShiroClient) Call(ctx context.Context, method string, configs ...types.Config) (types.ShiroResponse, error) {
cro, err := c.flatten(configs...)
cro, err := c.flatten(ctx, configs...)
if err != nil {
return nil, err
}
Expand All @@ -121,8 +120,8 @@ func (c *mockShiroClient) Call(ctx context.Context, method string, configs ...ty
}

// QueryInfo implements the ShiroClient interface.
func (c *mockShiroClient) QueryInfo(configs ...types.Config) (uint64, error) {
cro, err := c.flatten(configs...)
func (c *mockShiroClient) QueryInfo(ctx context.Context, configs ...types.Config) (uint64, error) {
cro, err := c.flatten(ctx, configs...)
if err != nil {
return 0, err
}
Expand All @@ -131,8 +130,8 @@ func (c *mockShiroClient) QueryInfo(configs ...types.Config) (uint64, error) {
}

// QueryBlock implements the ShiroClient interface.
func (c *mockShiroClient) QueryBlock(blockNumber uint64, configs ...types.Config) (types.Block, error) {
cro, err := c.flatten(configs...)
func (c *mockShiroClient) QueryBlock(ctx context.Context, blockNumber uint64, configs ...types.Config) (types.Block, error) {
cro, err := c.flatten(ctx, configs...)
if err != nil {
return nil, err
}
Expand Down
52 changes: 21 additions & 31 deletions internal/rpc/shiroclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *rpcShiroClient) doRequest(ctx context.Context, httpClient *http.Client,
// reqres is a round-trip "request/response" helper. Marshals "req",
// logs it at debug level, makes the HTTP request, reads and logs the
// response at debug level, unmarshals, parses into rpcres.
func (c *rpcShiroClient) reqres(req interface{}, opt *types.RequestOptions) (*rpcres, error) {
func (c *rpcShiroClient) reqres(ctx context.Context, req interface{}, opt *types.RequestOptions) (*rpcres, error) {
outmsg, err := json.Marshal(req)
if err != nil {
return nil, err
Expand All @@ -172,11 +172,6 @@ func (c *rpcShiroClient) reqres(req interface{}, opt *types.RequestOptions) (*rp
return nil, errors.New("ShiroClient.reqres expected an endpoint to be set")
}

ctx := opt.Ctx
if ctx == nil {
ctx = context.Background()
}

httpReq, err := http.NewRequest("POST", opt.Endpoint, bytes.NewReader(outmsg))
if err != nil {
return nil, err
Expand Down Expand Up @@ -286,11 +281,11 @@ func (c *rpcShiroClient) reqres(req interface{}, opt *types.RequestOptions) (*rp

// applyConfigs applies configs -- baseConfigs supplied in the
// constructor first, followed by configs arguments.
func (c *rpcShiroClient) applyConfigs(ctx context.Context, configs ...types.Config) (*types.RequestOptions, error) {
func (c *rpcShiroClient) applyConfigs(configs ...types.Config) (*types.RequestOptions, error) {
tConfigs := make([]types.Config, 0, len(c.baseConfig)+len(configs))
tConfigs = append(tConfigs, c.baseConfig...)
tConfigs = append(tConfigs, configs...)
return types.ApplyConfigs(ctx, c.defaultLog, tConfigs...), nil
return types.ApplyConfigs(c.defaultLog, tConfigs...), nil
}

// HealthCheck uses the RPC gateway server's health endpoint to check
Expand All @@ -299,7 +294,7 @@ func (c *rpcShiroClient) applyConfigs(ctx context.Context, configs ...types.Conf
// the RemoteHealthCheck function.
func (c *rpcShiroClient) HealthCheck(ctx context.Context, services []string, configs ...types.Config) (HealthCheck, error) {
// Validate config and transform params
opt, err := c.applyConfigs(ctx, configs...)
opt, err := c.applyConfigs(configs...)
if err != nil {
return nil, fmt.Errorf("healthcheck config: %w", err)
}
Expand Down Expand Up @@ -364,9 +359,8 @@ func urlQueryAppend(u *url.URL, vals url.Values) {
}

// Seed implements the ShiroClient interface.
func (c *rpcShiroClient) Seed(version string, configs ...types.Config) error {
ctx := context.TODO()
opt, err := c.applyConfigs(ctx, configs...)
func (c *rpcShiroClient) Seed(ctx context.Context, version string, configs ...types.Config) error {
opt, err := c.applyConfigs(configs...)
if err != nil {
return err
}
Expand All @@ -380,7 +374,7 @@ func (c *rpcShiroClient) Seed(version string, configs ...types.Config) error {
},
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return err
}
Expand All @@ -398,9 +392,8 @@ func (c *rpcShiroClient) Seed(version string, configs ...types.Config) error {
}

// ShiroPhylum implements the ShiroClient interface.
func (c *rpcShiroClient) ShiroPhylum(configs ...types.Config) (string, error) {
ctx := context.TODO()
opt, err := c.applyConfigs(ctx, configs...)
func (c *rpcShiroClient) ShiroPhylum(ctx context.Context, configs ...types.Config) (string, error) {
opt, err := c.applyConfigs(configs...)
if err != nil {
return "", err
}
Expand All @@ -412,7 +405,7 @@ func (c *rpcShiroClient) ShiroPhylum(configs ...types.Config) (string, error) {
"params": map[string]interface{}{},
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return "", err
}
Expand All @@ -435,9 +428,8 @@ func (c *rpcShiroClient) ShiroPhylum(configs ...types.Config) (string, error) {
}

// Init implements the ShiroClient interface.
func (c *rpcShiroClient) Init(phylum string, configs ...types.Config) error {
ctx := context.TODO()
opt, err := c.applyConfigs(ctx, configs...)
func (c *rpcShiroClient) Init(ctx context.Context, phylum string, configs ...types.Config) error {
opt, err := c.applyConfigs(configs...)
if err != nil {
return err
}
Expand All @@ -451,7 +443,7 @@ func (c *rpcShiroClient) Init(phylum string, configs ...types.Config) error {
},
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return err
}
Expand All @@ -470,7 +462,7 @@ func (c *rpcShiroClient) Init(phylum string, configs ...types.Config) error {

// Call implements the ShiroClient interface.
func (c *rpcShiroClient) Call(ctx context.Context, method string, configs ...types.Config) (types.ShiroResponse, error) {
opt, err := c.applyConfigs(ctx, configs...)
opt, err := c.applyConfigs(configs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -531,7 +523,7 @@ func (c *rpcShiroClient) Call(ctx context.Context, method string, configs ...typ
req["params"].(map[string]interface{})["creator_msp_id"] = opt.Creator
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -572,9 +564,8 @@ func (c *rpcShiroClient) Call(ctx context.Context, method string, configs ...typ
}

// QueryInfo implements the ShiroClient interface.
func (c *rpcShiroClient) QueryInfo(configs ...types.Config) (uint64, error) {
ctx := context.TODO()
opt, err := c.applyConfigs(ctx, configs...)
func (c *rpcShiroClient) QueryInfo(ctx context.Context, configs ...types.Config) (uint64, error) {
opt, err := c.applyConfigs(configs...)
if err != nil {
return 0, err
}
Expand All @@ -586,7 +577,7 @@ func (c *rpcShiroClient) QueryInfo(configs ...types.Config) (uint64, error) {
"params": map[string]interface{}{},
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return 0, err
}
Expand All @@ -609,9 +600,8 @@ func (c *rpcShiroClient) QueryInfo(configs ...types.Config) (uint64, error) {
}

// QueryBlock implements the ShiroClient interface.
func (c *rpcShiroClient) QueryBlock(blockNumber uint64, configs ...types.Config) (types.Block, error) {
ctx := context.TODO()
opt, err := c.applyConfigs(ctx, configs...)
func (c *rpcShiroClient) QueryBlock(ctx context.Context, blockNumber uint64, configs ...types.Config) (types.Block, error) {
opt, err := c.applyConfigs(configs...)
if err != nil {
return nil, err
}
Expand All @@ -623,7 +613,7 @@ func (c *rpcShiroClient) QueryBlock(blockNumber uint64, configs ...types.Config)
"params": map[string]interface{}{"block_number": float64(blockNumber)},
}

res, err := c.reqres(req, opt)
res, err := c.reqres(ctx, req, opt)
if err != nil {
return nil, err
}
Expand Down
18 changes: 6 additions & 12 deletions internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
type ShiroClient interface {
// Seed re-opens the ShiroClient, specifying the phylum version to
// target.
Seed(version string, config ...Config) error
Seed(ctx context.Context, version string, config ...Config) error

// ShiroPhylum returns a non-empty string which should act as an
// indentifier indicating the deployed phylum code being executed by
// the shiro server.
ShiroPhylum(config ...Config) (string, error)
ShiroPhylum(ctx context.Context, config ...Config) (string, error)

// Init initializes the chaincode given a string containing
// base64-encoded phylum code. The phylum code should be deployed
// with the identifier returned by method ShiroPhylum().
Init(phylum string, config ...Config) error
Init(ctx context.Context, phylum string, config ...Config) error

// Call executes method with the given parameters and commits the
// results. The method shuold be executed by the phylum code
Expand All @@ -45,11 +45,11 @@ type ShiroClient interface {
Call(ctx context.Context, method string, config ...Config) (ShiroResponse, error)

// QueryInfo returns the blockchain height.
QueryInfo(config ...Config) (uint64, error)
QueryInfo(ctx context.Context, config ...Config) (uint64, error)

// QueryBlock returns summary information about the block given by
// blockNumber.
QueryBlock(blockNumber uint64, config ...Config) (Block, error)
QueryBlock(ctx context.Context, blockNumber uint64, config ...Config) (Block, error)
}

type standardConfig struct {
Expand All @@ -72,23 +72,18 @@ type Config interface {
Fn(*RequestOptions)
}

func ApplyConfigs(ctx context.Context, log *logrus.Logger, configs ...Config) *RequestOptions {
func ApplyConfigs(log *logrus.Logger, configs ...Config) *RequestOptions {
uuid, err := uuid.NewRandom()
if err != nil {
panic(fmt.Errorf("uuid: %w", err))
}

if ctx == nil {
ctx = context.Background()
}

opt := &RequestOptions{
Log: log,
LogFields: make(logrus.Fields),
Headers: make(map[string]string),
ID: uuid.String(),
Transient: make(map[string][]byte),
Ctx: ctx,
}

for _, config := range configs {
Expand Down Expand Up @@ -116,7 +111,6 @@ type RequestOptions struct {
MspFilter []string
MinEndorsers int
Creator string
Ctx context.Context
DependentTxID string
DisableWritePolling bool
CcFetchURLDowngrade bool
Expand Down
6 changes: 3 additions & 3 deletions shiroclient/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func Test001(t *testing.T) {
require.NoError(t, err)
})

err = client.Init(shiroclient.EncodePhylumBytes(testPhylum))
ctx := context.Background()

err = client.Init(ctx, shiroclient.EncodePhylumBytes(testPhylum))
if err != nil {
t.Fatal(err)
}
Expand All @@ -68,8 +70,6 @@ func Test001(t *testing.T) {

lastReceivedMessage := "none"

ctx := context.Background()

ticker := driver.Register(ctx, "test_batch", time.Duration(1)*time.Hour, func(batchID string, requestID string, message json.RawMessage) (json.RawMessage, error) {
messageStr := string(message)
switch messageStr {
Expand Down
7 changes: 0 additions & 7 deletions shiroclient/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ func WithHTTPClient(client *http.Client) Config {
})
}

// WithContext allows specifying the context to use.
func WithContext(ctx context.Context) Config {
return types.Opt(func(r *types.RequestOptions) {
r.Ctx = ctx
})
}

// WithLog allows specifying the logger to use.
func WithLog(log *logrus.Logger) Config {
return types.Opt(func(r *types.RequestOptions) {
Expand Down
5 changes: 3 additions & 2 deletions shiroclient/private/private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ func newMockClient() (shiroclient.MockShiroClient, error) {
if err != nil {
return nil, err
}
version, err := client.ShiroPhylum()
ctx := context.Background()
version, err := client.ShiroPhylum(ctx)
if err != nil {
return nil, err
}
if version != "test" {
return nil, fmt.Errorf("expected version 'test'")
}
err = client.Init(shiroclient.EncodePhylumBytes(testPhylum))
err = client.Init(ctx, shiroclient.EncodePhylumBytes(testPhylum))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions shiroclient/shiroclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func call(client shiroclient.ShiroClient, method string, params interface{}, tra

func initClient(t *testing.T, client shiroclient.ShiroClient, phylum []byte) {
t.Helper()
err := client.Init(shiroclient.EncodePhylumBytes(phylum))
err := client.Init(context.Background(), shiroclient.EncodePhylumBytes(phylum))
require.NoError(t, err)
}

Expand All @@ -57,7 +57,7 @@ func TestHealth(t *testing.T) {
require.NoError(t, err)
})
initClient(t, client, testPhylum)
version, err := client.ShiroPhylum()
version, err := client.ShiroPhylum(context.Background())
require.NoError(t, err)
require.Equal(t, "test", version)

Expand Down
6 changes: 4 additions & 2 deletions shiroclient/update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func client(t *testing.T) shiroclient.ShiroClient {
t.Helper()
client, err := shiroclient.NewMock(nil)
require.NoError(t, err)
err = client.Init(shiroclient.EncodePhylumBytes(testPhylum))
err = client.Init(context.Background(), shiroclient.EncodePhylumBytes(testPhylum))
require.NoError(t, err)
return client
}
Expand Down Expand Up @@ -98,7 +98,9 @@ func TestInstall(t *testing.T) {
ctx := context.Background()

t.Run("init-2", func(t *testing.T) {
err := client.Init(shiroclient.EncodePhylumBytes(testPhylum), plugin.WithNewPhylumVersion("new"), shiroclient.WithContext(ctx))
err := client.Init(ctx,
shiroclient.EncodePhylumBytes(testPhylum),
plugin.WithNewPhylumVersion("new"))
require.NoError(t, err)

phyla, err := update.GetPhyla(ctx, client)
Expand Down
Loading
Loading