Skip to content

Commit

Permalink
Merge pull request #82 from s7techlab/event-block-range-refactoring
Browse files Browse the repository at this point in the history
block range refactoring, allows to set seek to block
  • Loading branch information
vitiko committed Dec 29, 2021
2 parents 9093b9f + 4fea277 commit 01a9626
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 184 deletions.
26 changes: 19 additions & 7 deletions api/peer_deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
)

var (
oldest = &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}}
newest = &orderer.SeekPosition{Type: &orderer.SeekPosition_Newest{Newest: &orderer.SeekNewest{}}}
maxStop = &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}
SeekFromOldest = &orderer.SeekPosition{
Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}}
SeekFromNewest = &orderer.SeekPosition{
Type: &orderer.SeekPosition_Newest{Newest: &orderer.SeekNewest{}}}
SeekToMax = SeekSpecified(math.MaxUint64)
)

type DeliverClient interface {
Expand All @@ -32,14 +34,14 @@ type EventCCSeekOption func() (*orderer.SeekPosition, *orderer.SeekPosition)
// SeekNewest sets offset to new channel blocks
func SeekNewest() EventCCSeekOption {
return func() (*orderer.SeekPosition, *orderer.SeekPosition) {
return newest, maxStop
return SeekFromNewest, SeekToMax
}
}

// SeekOldest sets offset to channel blocks from beginning
func SeekOldest() EventCCSeekOption {
return func() (*orderer.SeekPosition, *orderer.SeekPosition) {
return oldest, maxStop
return SeekFromOldest, SeekToMax
}
}

Expand All @@ -51,11 +53,21 @@ func SeekSingle(num uint64) EventCCSeekOption {
}
}

// SeekSpecified returns orderer.SeekPosition_Specified position
func SeekSpecified(number uint64) *orderer.SeekPosition {
return &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: number}}}
}

// SeekRange sets offset from one block to another by their numbers
func SeekRange(start, end uint64) EventCCSeekOption {
var seekFrom *orderer.SeekPosition
if start == 0 {
seekFrom = SeekFromOldest
} else {
seekFrom = SeekSpecified(start)
}
return func() (*orderer.SeekPosition, *orderer.SeekPosition) {
return &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: start}}},
&orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: end}}}
return seekFrom, SeekSpecified(end)
}
}

Expand Down
177 changes: 0 additions & 177 deletions client/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package client
import (
"context"
"fmt"
"math"
"sync"
"time"

ordererproto "github.com/hyperledger/fabric-protos-go/orderer"
fabPeer "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/chaincode/platforms/golang"
"github.com/hyperledger/fabric/msp"
"github.com/pkg/errors"
Expand All @@ -18,7 +15,6 @@ import (
"github.com/s7techlab/hlf-sdk-go/v2/api/config"
"github.com/s7techlab/hlf-sdk-go/v2/client/chaincode"
"github.com/s7techlab/hlf-sdk-go/v2/client/chaincode/system"
"github.com/s7techlab/hlf-sdk-go/v2/client/chaincode/txwaiter"
"github.com/s7techlab/hlf-sdk-go/v2/client/channel"
"github.com/s7techlab/hlf-sdk-go/v2/client/fetcher"
"github.com/s7techlab/hlf-sdk-go/v2/crypto"
Expand Down Expand Up @@ -143,179 +139,6 @@ func (c *core) FabricV2() bool {
return c.fabricV2
}

func (c *core) SeekOptByBlockRange(ctx context.Context, channel string, blockRangeFrom, blockRangeTo int64) (api.EventCCSeekOption, error) {
var seekFrom, seekTo uint64

c.logger.Debug(`seek by block range`,
zap.Int64(`from`, blockRangeFrom), zap.Int64(`to`, blockRangeTo))

// Same as seek Newest
if blockRangeFrom == 0 && blockRangeTo == 0 {
// no seek opt, no error
return nil, nil
}

switch {
case blockRangeFrom == 0:
fallthrough
case blockRangeFrom > 0:
seekFrom = uint64(blockRangeFrom)
case blockRangeFrom < 0:

// from -{x} means we need to look x blocks back for events
// thus we need to know current channel height
channelInfo, err := c.System().QSCC().GetChainInfo(ctx, channel)
if err != nil {
return nil, fmt.Errorf(`get channel height: %w`, err)
}
c.logger.Debug(`get channel info for calculate negative block from`,
zap.Uint64(`channel_height`, channelInfo.Height))
seekFrom = uint64(int64(channelInfo.Height) + blockRangeFrom)
}

switch {
case blockRangeTo == 0:
seekTo = math.MaxUint64
case blockRangeTo > 0:
seekTo = uint64(blockRangeTo)
case blockRangeTo < 0:
seekTo = math.MaxUint64 - uint64(-blockRangeTo)
}

c.logger.Debug(`seek opts`,
zap.Uint64(`seek from`, seekFrom),
zap.Uint64(`seek to`, seekTo))

return func() (*ordererproto.SeekPosition, *ordererproto.SeekPosition) {
return &ordererproto.SeekPosition{
Type: &ordererproto.SeekPosition_Specified{Specified: &ordererproto.SeekSpecified{Number: seekFrom}}},
&ordererproto.SeekPosition{
Type: &ordererproto.SeekPosition_Specified{Specified: &ordererproto.SeekSpecified{Number: seekTo}}}

}, nil

}

func (c *core) Events(
ctx context.Context,
chanName string,
ccName string,
identity msp.SigningIdentity,
blockRange ...int64,
) (chan *fabPeer.ChaincodeEvent, error) {
if identity == nil {
identity = c.CurrentIdentity()
}

c.logger.Debug(`block range`, zap.Reflect(`slice`, blockRange))
mspID := identity.GetMSPIdentifier()

dc, err := c.PeerPool().DeliverClient(mspID, identity)
if err != nil {
return nil, err
}

var (
blockRangeFrom, blockRangeTo int64
seekOpts []api.EventCCSeekOption
)

if len(blockRange) > 0 {
blockRangeFrom = blockRange[0]
}

if len(blockRange) > 1 {
blockRangeTo = blockRange[1]
}

seekOpt, err := c.SeekOptByBlockRange(ctx, chanName, blockRangeFrom, blockRangeTo)
if err != nil {
return nil, err
}

if seekOpt != nil {
seekOpts = append(seekOpts, seekOpt)
}

subcription, err := dc.SubscribeCC(ctx, chanName, ccName, seekOpts...)
if err != nil {
return nil, err
}

return subcription.Events(), nil
}

func (c *core) Invoke(
ctx context.Context,
chanName string,
ccName string,
args [][]byte,
identity msp.SigningIdentity,
transient map[string][]byte,
txWaiterType string,
) (*fabPeer.Response, string, error) {
doOpts := []api.DoOption{}

switch txWaiterType {
case "":
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.Self))
case api.TxWaiterSelfType:
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.Self))
case api.TxWaiterAllType:
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.All))
default:
return nil, "", fmt.Errorf("invalid tx waiter type. got %v, available: '%v', '%v'", txWaiterType, api.TxWaiterSelfType, api.TxWaiterAllType)
}

if identity == nil {
identity = c.CurrentIdentity()
}

ccAPI, err := c.Channel(chanName).Chaincode(ctx, ccName)
if err != nil {
return nil, "", err
}

res, tx, err := ccAPI.Invoke(string(args[0])).
ArgBytes(args[1:]).
WithIdentity(identity).
Transient(transient).
Do(ctx, doOpts...)
if err != nil {
return nil, "", err
}

return res, string(tx), nil
}

func (c *core) Query(
ctx context.Context,
chanName string,
ccName string,
args [][]byte,
identity msp.SigningIdentity,
transient map[string][]byte,
) (*fabPeer.Response, error) {
if identity == nil {
identity = c.CurrentIdentity()
}

ccAPI, err := c.Channel(chanName).Chaincode(ctx, ccName)
if err != nil {
return nil, err
}

argsStrings := make([]string, 0)
for _, arg := range args {
argsStrings = append(argsStrings, string(arg))
}

return ccAPI.Query(argsStrings[0], argsStrings[1:]...).
WithIdentity(identity).
Transient(transient).
Do(ctx)
}

func NewCore(mspId string, identity api.Identity, opts ...CoreOpt) (api.Core, error) {
var err error
core := &core{
Expand Down
123 changes: 123 additions & 0 deletions client/core_sugared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package client

import (
"context"
"fmt"

fabPeer "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/msp"
"go.uber.org/zap"

"github.com/s7techlab/hlf-sdk-go/v2/api"
"github.com/s7techlab/hlf-sdk-go/v2/client/chaincode"
"github.com/s7techlab/hlf-sdk-go/v2/client/chaincode/txwaiter"
)

func (c *core) Invoke(
ctx context.Context,
chanName string,
ccName string,
args [][]byte,
identity msp.SigningIdentity,
transient map[string][]byte,
txWaiterType string,
) (*fabPeer.Response, string, error) {
doOpts := []api.DoOption{}

switch txWaiterType {
case "":
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.Self))
case api.TxWaiterSelfType:
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.Self))
case api.TxWaiterAllType:
doOpts = append(doOpts, chaincode.WithTxWaiter(txwaiter.All))
default:
return nil, "", fmt.Errorf("invalid tx waiter type. got %v, available: '%v', '%v'", txWaiterType, api.TxWaiterSelfType, api.TxWaiterAllType)
}

if identity == nil {
identity = c.CurrentIdentity()
}

ccAPI, err := c.Channel(chanName).Chaincode(ctx, ccName)
if err != nil {
return nil, "", err
}

res, tx, err := ccAPI.Invoke(string(args[0])).
ArgBytes(args[1:]).
WithIdentity(identity).
Transient(transient).
Do(ctx, doOpts...)
if err != nil {
return nil, "", err
}

return res, string(tx), nil
}

func (c *core) Query(
ctx context.Context,
chanName string,
ccName string,
args [][]byte,
identity msp.SigningIdentity,
transient map[string][]byte,
) (*fabPeer.Response, error) {
if identity == nil {
identity = c.CurrentIdentity()
}

ccAPI, err := c.Channel(chanName).Chaincode(ctx, ccName)
if err != nil {
return nil, err
}

argsStrings := make([]string, 0)
for _, arg := range args {
argsStrings = append(argsStrings, string(arg))
}

return ccAPI.Query(argsStrings[0], argsStrings[1:]...).
WithIdentity(identity).
Transient(transient).
Do(ctx)
}

func (c *core) Events(
ctx context.Context,
chanName string,
ccName string,
identity msp.SigningIdentity,
blockRange ...int64,
) (chan *fabPeer.ChaincodeEvent, error) {

if identity == nil {
identity = c.CurrentIdentity()
}

c.logger.Debug(`block range`, zap.Reflect(`slice`, blockRange))
mspID := identity.GetMSPIdentifier()

dc, err := c.PeerPool().DeliverClient(mspID, identity)
if err != nil {
return nil, err
}

var seekOpts []api.EventCCSeekOption
seekOpt, err := NewSeekOptConverter(c).ByBlockRange(ctx, chanName, blockRange...)
if err != nil {
return nil, err
}

if seekOpt != nil {
seekOpts = append(seekOpts, seekOpt)
}

subscription, err := dc.SubscribeCC(ctx, chanName, ccName, seekOpts...)
if err != nil {
return nil, err
}

return subscription.Events(), nil
}
Loading

0 comments on commit 01a9626

Please sign in to comment.