Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of anchoring via KAS API #591

Merged
merged 4 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/utils/nodecmd/dumpconfigcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func makeServiceChainConfig(ctx *cli.Context) (config sc.SCConfig) {
cfg.KASAnchor = ctx.GlobalBool(utils.KASServiceChainAnchorFlag.Name)
if cfg.KASAnchor {
cfg.KASAnchorPeriod = ctx.GlobalUint64(utils.KASServiceChainAnchorPeriodFlag.Name)
if cfg.KASAnchorPeriod == 0 {
cfg.KASAnchorPeriod = 1
logger.Warn("KAS anchor period is set by 1")
}

cfg.KASAnchorUrl = ctx.GlobalString(utils.KASServiceChainAnchorUrlFlag.Name)
if cfg.KASAnchorUrl == "" {
Expand Down
5 changes: 5 additions & 0 deletions console/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,11 @@ web3._extend({
call: 'subbridge_unsubscribeBridge',
params: 2
}),
new web3._extend.Method({
name: 'KASAnchor',
call: 'subbridge_kASAnchor',
params: 1
}),
new web3._extend.Method({
name: 'anchoring',
call: 'subbridge_anchoring',
Expand Down
77 changes: 77 additions & 0 deletions kas/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/klaytn/klaytn/blockchain/types"
"github.com/klaytn/klaytn/common"
"math/big"
"net/http"
)

Expand Down Expand Up @@ -66,6 +68,78 @@ func NewKASAnchor(kasConfig *KASConfig, db AnchorDB, bc BlockChain) *Anchor {
}
}

// AnchorPeriodicBlock periodically anchor blocks to KAS.
// if given block is invalid, it does nothing.
func (anchor *Anchor) AnchorPeriodicBlock(block *types.Block) {
if !anchor.kasConfig.Anchor {
return
}

if block == nil {
logger.Error("KAS Anchor : can not anchor nil block")
return
}

if block.NumberU64()%anchor.kasConfig.AnchorPeriod != 0 {
return
}

if err := anchor.AnchorBlock(block); err != nil {
logger.Warn("Failed to anchor a block via KAS", "blkNum", block.NumberU64())
}
}

// blockToAnchoringDataInternalType0 makes AnchoringDataInternalType0 from the given block.
// TxCount is the number of transactions of the last N blocks. (N is a anchor period.)
func (anchor *Anchor) blockToAnchoringDataInternalType0(block *types.Block) *types.AnchoringDataInternalType0 {
aidan-kwon marked this conversation as resolved.
Show resolved Hide resolved
start := uint64(0)
if block.NumberU64() >= anchor.kasConfig.AnchorPeriod {
start = block.NumberU64() - anchor.kasConfig.AnchorPeriod + 1
}
blkCnt := block.NumberU64() - start + 1

txCount := len(block.Body().Transactions)
for i := start; i < block.NumberU64(); i++ {
block := anchor.bc.GetBlockByNumber(i)
if block == nil {
return nil
}
txCount += len(block.Body().Transactions)
}

return &types.AnchoringDataInternalType0{
BlockHash: block.Hash(),
TxHash: block.Header().TxHash,
ParentHash: block.Header().ParentHash,
ReceiptHash: block.Header().ReceiptHash,
StateRootHash: block.Header().Root,
BlockNumber: block.Header().Number,
BlockCount: new(big.Int).SetUint64(blkCnt),
TxCount: big.NewInt(int64(txCount)),
}
}

// AnchorBlock converts given block to payload and anchor the payload via KAS anchor API.
func (anchor *Anchor) AnchorBlock(block *types.Block) error {
anchorData := anchor.blockToAnchoringDataInternalType0(block)

payload := dataToPayload(anchorData)

res, err := anchor.sendRequest(payload)
if err != nil {
return err
}

if res.Code != codeOK {
result, _ := json.Marshal(res)
logger.Debug("Failed to anchor a block via KAS", "blkNum", block.NumberU64(), "result", string(result))
ehnuje marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error code %v", res.Code)
}

logger.Info("Anchored a block via KAS", "blkNum", block.NumberU64())
return nil
}

type respBody struct {
Code int `json:"code"`
Result interface{} `json:"result"`
Expand All @@ -81,6 +155,7 @@ type Payload struct {
types.AnchoringDataInternalType0
}

// dataToPayload wraps given AnchoringDataInternalType0 to payload with `id` field.
func dataToPayload(anchorData *types.AnchoringDataInternalType0) *Payload {
payload := &Payload{
Id: anchorData.BlockNumber.String(),
Expand All @@ -89,6 +164,8 @@ func dataToPayload(anchorData *types.AnchoringDataInternalType0) *Payload {

return payload
}

// sendRequest requests to KAS anchor API with given payload.
func (anchor *Anchor) sendRequest(payload interface{}) (*respBody, error) {
header := map[string]string{
"Content-Type": "application/json",
Expand Down
109 changes: 109 additions & 0 deletions kas/anchor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"github.com/golang/mock/gomock"
"github.com/klaytn/klaytn/blockchain/types"
"github.com/klaytn/klaytn/common"
"github.com/klaytn/klaytn/crypto"
"github.com/klaytn/klaytn/kas/mocks"
"github.com/stretchr/testify/assert"
"io/ioutil"
"math/big"
"math/rand"
"net/http"
"strconv"
"testing"
Expand All @@ -48,6 +50,40 @@ func testAnchorData() *types.AnchoringDataInternalType0 {
TxCount: big.NewInt(7),
}
}

func TestExampleSendRequest(t *testing.T) {
url := "http://anchor.wallet-api.dev.klaytn.com/v1/anchor"
xkrn := "krn:1001:anchor:test:operator-pool:op1"
user := "78ab9116689659321aaf472aa154eac7dd7a99c6"
pwd := "403e0397d51a823cd59b7edcb212788c8599dd7e"

operator := common.StringToAddress("0x1552F52D459B713E0C4558e66C8c773a75615FA8")

// Anchor Data
anchorData := testAnchorData()

kasConfig := &KASConfig{
Url: url,
Xkrn: xkrn,
User: user,
Pwd: pwd,
Operator: operator,
Anchor: true,
AnchorPeriod: 1,
}

kasAnchor := NewKASAnchor(kasConfig, nil, nil)

payload := dataToPayload(anchorData)
res, err := kasAnchor.sendRequest(payload)
assert.NoError(t, err)

result, err := json.Marshal(res)
assert.NoError(t, err)

t.Log(string(result))
}

func TestSendRequest(t *testing.T) {
config := KASConfig{}
anchor := NewKASAnchor(&config, nil, nil)
Expand Down Expand Up @@ -84,3 +120,76 @@ func TestSendRequest(t *testing.T) {
assert.Nil(t, resp)
}
}

func TestDataToPayload(t *testing.T) {
anchorData := testAnchorData()
pl := dataToPayload(anchorData)
assert.Equal(t, anchorData.BlockNumber.String(), pl.Id)
assert.Equal(t, *anchorData, pl.AnchoringDataInternalType0)
}

func TestBlockToAnchoringDataInternalType0(t *testing.T) {
testBlockToAnchoringDataInternalType0(t, 1)
testBlockToAnchoringDataInternalType0(t, 7)
testBlockToAnchoringDataInternalType0(t, 100)
}

func testBlockToAnchoringDataInternalType0(t *testing.T, period uint64) {
random := rand.New(rand.NewSource(0))

config := KASConfig{
Anchor: true,
AnchorPeriod: period,
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
bc := mocks.NewMockBlockChain(ctrl)

anchor := NewKASAnchor(&config, nil, bc)
testBlkN := uint64(100)
pastCnt := [100]uint64{}
txCnt := uint64(0)

for blkNum := uint64(0); blkNum < testBlkN; blkNum++ {
// Gen random block
header := &types.Header{Number: big.NewInt(int64(blkNum))}
block := types.NewBlockWithHeader(header)
txNum := random.Uint64() % 10
txs, _ := genTransactions(txNum)
body := &types.Body{Transactions: txs}
block = block.WithBody(body.Transactions)

// update blockchain mock
bc.EXPECT().GetBlockByNumber(blkNum).Return(block).AnyTimes()

// call target func
result := anchor.blockToAnchoringDataInternalType0(block)

// calc expected value
txCnt -= pastCnt[blkNum%anchor.kasConfig.AnchorPeriod]
pastCnt[blkNum%anchor.kasConfig.AnchorPeriod] = txNum
txCnt += txNum

// compare result
assert.Equal(t, txCnt, result.TxCount.Uint64(), "blkNum:%v", blkNum)
}
}

func genTransactions(n uint64) (types.Transactions, error) {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
signer := types.NewEIP155Signer(big.NewInt(18))

txs := types.Transactions{}

for i := uint64(0); i < n; i++ {
tx, _ := types.SignTx(
types.NewTransaction(0, addr,
big.NewInt(int64(n)), 0, big.NewInt(int64(n)), nil), signer, key)

txs = append(txs, tx)
}

return txs, nil
}
12 changes: 12 additions & 0 deletions node/sc/api_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ func (sb *SubBridgeAPI) GetBridgeInformation(bridgeAddr common.Address) (map[str
}, nil
}

func (sb *SubBridgeAPI) KASAnchor(blkNum uint64) error {
block := sb.subBridge.blockchain.GetBlockByNumber(blkNum)
if block != nil {
if err := sb.subBridge.kasAnchor.AnchorBlock(block); err != nil {
logger.Error("Failed to anchor a block via KAS", "blkNum", block.NumberU64())
return err
}
return nil
}
return errInvalidBlock
}

func (sb *SubBridgeAPI) Anchoring(flag bool) bool {
return sb.subBridge.SetAnchoringTx(flag)
}
Expand Down
6 changes: 3 additions & 3 deletions node/sc/mainbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
)

const (
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10000
// chainEventChanSize is the size of channel listening to ChainHeadEvent.
chainEventChanSize = 10000
chainLogChanSize = 10000
transactionChanSize = 10000
rpcBufferSize = 1024
Expand Down Expand Up @@ -126,7 +126,7 @@ func NewMainBridge(ctx *node.ServiceContext, config *SCConfig) (*MainBridge, err
accountManager: ctx.AccountManager,
networkId: config.NetworkId,
ctx: ctx,
chainHeadCh: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
chainHeadCh: make(chan blockchain.ChainHeadEvent, chainEventChanSize),
logsCh: make(chan []*types.Log, chainLogChanSize),
txCh: make(chan blockchain.NewTxsEvent, transactionChanSize),
quitSync: make(chan struct{}),
Expand Down
36 changes: 27 additions & 9 deletions node/sc/subbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/klaytn/klaytn/common"
"github.com/klaytn/klaytn/crypto"
"github.com/klaytn/klaytn/event"
"github.com/klaytn/klaytn/kas"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
"github.com/klaytn/klaytn/networks/rpc"
Expand Down Expand Up @@ -131,10 +132,10 @@ type SubBridge struct {
bridgeTxPool BridgeTxPool

// chain event
chainHeadCh chan blockchain.ChainHeadEvent
chainHeadSub event.Subscription
logsCh chan []*types.Log
logsSub event.Subscription
chainCh chan blockchain.ChainEvent
chainSub event.Subscription
logsCh chan []*types.Log
logsSub event.Subscription

// If this channel can't be read immediately, it can lock service chain tx pool.
// Commented out because for now, it doesn't need.
Expand Down Expand Up @@ -164,6 +165,9 @@ type SubBridge struct {

rpcConn net.Conn
rpcSendCh chan []byte

//KAS Anchor
kasAnchor *kas.Anchor
}

// New creates a new CN object (including the
Expand All @@ -183,7 +187,7 @@ func NewSubBridge(ctx *node.ServiceContext, config *SCConfig) (*SubBridge, error
accountManager: ctx.AccountManager,
networkId: config.NetworkId,
ctx: ctx,
chainHeadCh: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
chainCh: make(chan blockchain.ChainEvent, chainEventChanSize),
logsCh: make(chan []*types.Log, chainLogChanSize),
// txCh: make(chan blockchain.NewTxsEvent, transactionChanSize),
requestEventCh: make(chan *RequestValueTransferEvent, requestEventChanSize),
Expand Down Expand Up @@ -320,8 +324,20 @@ func (sb *SubBridge) SetComponents(components []interface{}) {
switch v := component.(type) {
case *blockchain.BlockChain:
sb.blockchain = v

kasConfig := &kas.KASConfig{
Url: sb.config.KASAnchorUrl,
Xkrn: sb.config.KASXKRN,
User: sb.config.KASAccessKey,
Pwd: sb.config.KASSecretKey,
Operator: common.HexToAddress(sb.config.KASAnchorOperator),
Anchor: sb.config.KASAnchor,
AnchorPeriod: sb.config.KASAnchorPeriod,
}
sb.kasAnchor = kas.NewKASAnchor(kasConfig, sb.chainDB, v)

// event from core-service
sb.chainHeadSub = sb.blockchain.SubscribeChainHeadEvent(sb.chainHeadCh)
sb.chainSub = sb.blockchain.SubscribeChainEvent(sb.chainCh)
sb.logsSub = sb.blockchain.SubscribeLogsEvent(sb.logsCh)
sb.bridgeAccounts.cAccount.SetChainID(v.Config().ChainID)
case *blockchain.TxPool:
Expand Down Expand Up @@ -592,11 +608,13 @@ func (sb *SubBridge) loop() {
case sendData := <-sb.rpcSendCh:
sb.SendRPCData(sendData)
// Handle ChainHeadEvent
case ev := <-sb.chainHeadCh:
case ev := <-sb.chainCh:
if ev.Block != nil {
if err := sb.eventhandler.HandleChainHeadEvent(ev.Block); err != nil {
logger.Error("subbridge block event", "err", err)
}

sb.kasAnchor.AnchorPeriodicBlock(ev.Block)
} else {
logger.Error("subbridge block event is nil")
}
Expand Down Expand Up @@ -625,7 +643,7 @@ func (sb *SubBridge) loop() {
if err := sb.eventhandler.ProcessHandleEvent(ev); err != nil {
logger.Error("fail to process handle value transfer event ", "err", err)
}
case err := <-sb.chainHeadSub.Err():
case err := <-sb.chainSub.Err():
if err != nil {
logger.Error("subbridge block subscription ", "err", err)
}
Expand Down Expand Up @@ -729,7 +747,7 @@ func (sb *SubBridge) Stop() error {
close(sb.quitSync)
sb.bridgeManager.stopAllRecoveries()

sb.chainHeadSub.Unsubscribe()
sb.chainSub.Unsubscribe()
//sb.txSub.Unsubscribe()
sb.logsSub.Unsubscribe()
sb.requestEventSub.Unsubscribe()
Expand Down