Skip to content

Commit

Permalink
write-struct 18
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Oct 23, 2023
1 parent b282c6a commit a465395
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 31 deletions.
4 changes: 2 additions & 2 deletions block/block_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func (x *Block) ValidEnvelopes() []*Envelope {
var envs []*Envelope
for _, e := range x.Data.Envelopes {
for _, e := range x.GetData().GetEnvelopes() {
if e.ValidationCode != peer.TxValidationCode_VALID {
continue
}
Expand Down Expand Up @@ -39,7 +39,7 @@ func (x *Block) Writes() []*Write {
for _, e := range x.ValidEnvelopes() {
for _, a := range e.TxActions() {
for _, rwSet := range a.NsReadWriteSet() {
for _, write := range rwSet.Rwset.Writes {
for _, write := range rwSet.GetRwset().GetWrites() {
blockWrite := &Write{
KWWrite: write,

Expand Down
5 changes: 3 additions & 2 deletions block/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
func (x *Transaction) Events() []*peer.ChaincodeEvent {
var events []*peer.ChaincodeEvent
for _, a := range x.Actions {
if a.GetPayload().GetAction().GetProposalResponsePayload().GetExtension().GetEvents() != nil {
events = append(events, a.Payload.Action.ProposalResponsePayload.Extension.Events)
event := a.GetPayload().GetAction().GetProposalResponsePayload().GetExtension().GetEvents()
if event != nil {
events = append(events, event)
}
}
return events
Expand Down
5 changes: 1 addition & 4 deletions block/tx_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ func ParseTxAction(txAction *peer.TransactionAction) (*TransactionAction, error)
chaincodeProposalPayload.Input.ChaincodeSpec.ChaincodeId.Version = ccEndorserAction.ProposalResponsePayload.Extension.ChaincodeId.Version
}

var responsePayload []byte
if ccEndorserAction.GetProposalResponsePayload().GetExtension().GetResponse() != nil {
responsePayload = ccEndorserAction.GetProposalResponsePayload().GetExtension().GetResponse().GetPayload()
}
responsePayload := ccEndorserAction.GetProposalResponsePayload().GetExtension().GetResponse().GetPayload()

return &TransactionAction{
Header: &SignatureHeader{
Expand Down
2 changes: 1 addition & 1 deletion buf.work.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ version: v1
directories:
- block
- service
- third_party
- third_party
3 changes: 1 addition & 2 deletions identity/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"path"

"github.com/pkg/errors"

"github.com/s7techlab/hlf-sdk-go/api"
)

Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (bp *BlockPeer) initChannels(ctx context.Context) {

for channel := range bp.peerChannels.Channels() {
if _, ok := bp.channelObservers[channel]; !ok {
bp.logger.Info(`add channel Observer`, zap.String(`channel`, channel))
bp.logger.Info(`add channel observer`, zap.String(`channel`, channel))

bp.channelObservers[channel] = bp.peerChannel(ctx, channel)
}
Expand All @@ -198,7 +198,7 @@ func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *BlockPeer

_, peerChannel.err = peerChannel.Observer.Observe(ctx)
if peerChannel.err != nil {
bp.logger.Warn(`init channel Observer`, zap.Error(peerChannel.err))
bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err))
}

// channel merger
Expand All @@ -207,7 +207,7 @@ func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *BlockPeer
bp.blocks <- b
}

// after all reads peerParsedChannel.Observer.blocks close channels
// after all reads peerParsedChannel.observer.blocks close channels
close(bp.blocks)
for _, blocks := range bp.blocksByChannels {
close(blocks)
Expand Down
4 changes: 2 additions & 2 deletions observer/block_peer_common_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChann

for channel := range bp.peerChannels.Channels() {
if _, ok := bp.channelObservers[channel]; !ok {
bp.logger.Info(`add channel Observer concurrently`, zap.String(`channel`, channel))
bp.logger.Info(`add channel observer concurrently`, zap.String(`channel`, channel))

bp.channelObservers[channel] = bp.peerChannelConcurrently(ctx, channel, blocksByChannels)
}
Expand All @@ -81,7 +81,7 @@ func (bp *BlockPeer) peerChannelConcurrently(ctx context.Context, channel string

_, peerChannel.err = peerChannel.Observer.Observe(ctx)
if peerChannel.err != nil {
bp.logger.Warn(`init channel Observer`, zap.Error(peerChannel.err))
bp.logger.Warn(`init channel observer`, zap.Error(peerChannel.err))
}

blocks := make(chan *Block)
Expand Down
4 changes: 2 additions & 2 deletions observer/block_peer_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var _ = Describe("Block Peer", func() {
channelPeerMockForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer Observer
// wait to commonBlockPeer observer
time.Sleep(time.Millisecond * 10)

channelObservers := commonBlockPeer.ChannelObservers()
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("Block Peer", func() {
channelPeerMockConcurrentlyForCommon.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to commonBlockPeer Observer
// wait to commonBlockPeer observer
time.Sleep(time.Millisecond * 200)

channelObservers := commonBlockPeerConcurrently.ChannelObservers()
Expand Down
6 changes: 3 additions & 3 deletions observer/block_peer_parsed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (pbp *ParsedBlockPeer) initParsedChannels(ctx context.Context) {

for channel := range pbp.blockPeer.peerChannels.Channels() {
if _, ok := pbp.parsedChannelObservers[channel]; !ok {
pbp.blockPeer.logger.Info(`add parsed channel Observer`, zap.String(`channel`, channel))
pbp.blockPeer.logger.Info(`add parsed channel observer`, zap.String(`channel`, channel))

pbp.parsedChannelObservers[channel] = pbp.peerParsedChannel(ctx, channel)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (pbp *ParsedBlockPeer) peerParsedChannel(ctx context.Context, channel strin

_, peerParsedChannel.err = peerParsedChannel.Observer.Observe(ctx)
if peerParsedChannel.err != nil {
pbp.blockPeer.logger.Warn(`init parsed channel Observer`, zap.Error(peerParsedChannel.err))
pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err))
}

// channel merger
Expand All @@ -173,7 +173,7 @@ func (pbp *ParsedBlockPeer) peerParsedChannel(ctx context.Context, channel strin
pbp.blocks <- b
}

// after all reads peerParsedChannel.Observer.blocks close channels
// after all reads peerParsedChannel.observer.blocks close channels
close(pbp.blocks)
for _, blocks := range pbp.blocksByChannels {
close(blocks)
Expand Down
4 changes: 2 additions & 2 deletions observer/block_peer_parsed_concurrently.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (pbp *ParsedBlockPeer) initParsedChannelsConcurrently(ctx context.Context,

for channel := range pbp.blockPeer.peerChannels.Channels() {
if _, ok := pbp.parsedChannelObservers[channel]; !ok {
pbp.blockPeer.logger.Info(`add parsed channel Observer concurrently`, zap.String(`channel`, channel))
pbp.blockPeer.logger.Info(`add parsed channel observer concurrently`, zap.String(`channel`, channel))

pbp.parsedChannelObservers[channel] = pbp.peerParsedChannelConcurrently(ctx, channel, blocksByChannels)
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (pbp *ParsedBlockPeer) peerParsedChannelConcurrently(ctx context.Context, c

_, peerParsedChannel.err = peerParsedChannel.Observer.Observe(ctx)
if peerParsedChannel.err != nil {
pbp.blockPeer.logger.Warn(`init parsed channel Observer`, zap.Error(peerParsedChannel.err))
pbp.blockPeer.logger.Warn(`init parsed channel observer`, zap.Error(peerParsedChannel.err))
}

blocks := make(chan *ParsedBlock)
Expand Down
4 changes: 2 additions & 2 deletions observer/block_peer_parsed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var _ = Describe("Block Peer", func() {
channelPeerMockForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to parsedBlockPeer Observer
// wait to parsedBlockPeer observer
time.Sleep(time.Second + time.Millisecond*10)

channelObservers := parsedBlockPeer.ChannelObservers()
Expand Down Expand Up @@ -144,7 +144,7 @@ var _ = Describe("Block Peer", func() {
channelPeerMockConcurrentlyForParsed.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel})
}

// wait to blockPeer Observer
// wait to blockPeer observer
time.Sleep(time.Millisecond * 200)

channelObservers := parsedBlockPeerConcurrently.ChannelObservers()
Expand Down
5 changes: 1 addition & 4 deletions observer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)

var ErrChannelObserverAlreadyStarted = errors.New(`channel Observer already started`)
var ErrChannelObserverAlreadyStarted = errors.New(`channel observer already started`)

type (
SeekFromFetcher func(ctx context.Context, channel string) (uint64, error)
Expand Down Expand Up @@ -43,9 +43,6 @@ type (
// when we subscribed to channel
connectedAt time.Time

// number of last fetched Observer
// lastFetchedBlock uint64

// last errors we got
lastError error

Expand Down
2 changes: 1 addition & 1 deletion observer/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ package observer
// }
//}
//
//// WithChannels - specify channels and from which Observer you want to get blocks from channel
//// WithChannels - specify channels and from which observer you want to get blocks from channel
//// sets channels 'name'/'seekFromBlock' settings
//// Name could be regex pattern and MUST begin and end with '/'. Example '/channel.*/'
// func WithChannels(subscribedChannels ...ChannelSetting) ObserverOpt {
Expand Down
2 changes: 1 addition & 1 deletion observer/transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package observer

// BlockTransformer transforms parsed Observer data. For example decrypt, or transformer protobuf state to json
// BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json
type BlockTransformer interface {
Transform(*ParsedBlock) error
}

0 comments on commit a465395

Please sign in to comment.